MqttVerticle.java 2.36 KB
package com.diligrp.mqtt.vertx.verticle;

import com.diligrp.mqtt.core.util.JsonUtils;
import com.diligrp.mqtt.core.model.Printer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;

import static com.diligrp.mqtt.core.constant.MqttTopicConstant.PRINTER_TOPIC;

/**
 * @Author: zhangmeiyang
 * @CreateTime: 2025-12-26 17:05
 * @Version: todo
 */
public class MqttVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LoggerFactory.getLogger(MqttVerticle.class);


    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer(PRINTER_TOPIC, message -> {
            String body = message.body().toString();
            Printer printer = JsonUtils.fromJsonString(body, Printer.class);
            MqttClientOptions options = new MqttClientOptions();
            options.setUsername(printer.getUsername());
            options.setPassword(printer.getPassword());
            MqttClient client = MqttClient.create(vertx, options);
            doSend(client, printer, eventBus);
        });
    }

    private static void doSend(MqttClient client, Printer printer, EventBus eventBus) {
        client.connect(printer.getPort(), printer.getHost(), connResult -> {
            if (connResult.succeeded()) {
                // 连接成功后发布消息
                client.publish(printer.getTopic(), Buffer.buffer(printer.getData()), printer.getQos(), false, false, pubResult -> {
                    if (pubResult.succeeded()) {
                        LOGGER.info("Message published");
                        eventBus.send(printer.getSuccessCallBackEventBus(), JsonUtils.toJsonString(printer));
                    } else {
                        LOGGER.error("Failed to publish message", pubResult.cause());
                        eventBus.send(printer.getFailCallBackEventBus(), JsonUtils.toJsonString(printer));
                    }
                    client.disconnect();
                });
            } else {
                LOGGER.error("Failed to connect to MQTT server", connResult.cause());
            }
        });
    }
}