MqttVerticle.java 2.46 KB
package com.diligrp.mqtt.vertx.verticle;

import com.diligrp.mqtt.core.model.Printer;
import com.diligrp.mqtt.core.util.JsonUtils;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;

import static com.diligrp.mqtt.core.constant.MqttTopicConstant.PRINTER_TOPIC;

/**
 * @Author: zhangmeiyang
 * @CreateTime: 2025-12-26 17:05
 * @Version: todo
 */
public class MqttVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LoggerFactory.getLogger(MqttVerticle.class);


    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer(PRINTER_TOPIC, message -> {
            String body = message.body().toString();
            Printer printer = JsonUtils.fromJsonString(body, Printer.class);
            MqttClientOptions options = new MqttClientOptions();
            options.setUsername(printer.getUsername());
            options.setPassword(printer.getPassword());
            MqttClient client = MqttClient.create(vertx, options);
            doSend(client, printer, eventBus);
        });
    }

    private void doSend(MqttClient client, Printer printer, EventBus eventBus) {
        client.connect(printer.getPort(), printer.getHost())
                .onFailure(throwable -> {
                    LOGGER.error("连接到[MQTT-SERVER]失败", throwable);
                    eventBus.send(PRINTER_TOPIC, JsonUtils.toJsonString(printer));
                })
                .onSuccess(connResult -> {
                    client.publish(printer.getTopic(), Buffer.buffer(printer.getData()), printer.getQos(), false, false)
                            .onFailure(throwable -> {
                                LOGGER.error("MQTT消息发送失败", throwable);
                                eventBus.send(printer.getFailCallBackEventBus(), JsonUtils.toJsonString(printer));
                            })
                            .onSuccess(pubResult -> {
                                LOGGER.info("MQTT消息发送成功");
                                eventBus.send(printer.getSuccessCallBackEventBus(), JsonUtils.toJsonString(printer));
                            });
                    client.disconnect();
                });
    }
}