MqttReceiveVerticle.java 2.8 KB
package com.diligrp.mqtt.vertx.verticle;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;

/**
 * @Author: zhangmeiyang
 * @CreateTime: 2025-12-30 18:01
 * @Version: todo
 */
public class MqttReceiveVerticle extends AbstractVerticle {

    private static final Logger log = LoggerFactory.getLogger(MqttReceiveVerticle.class);

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        // MQTT 服务器连接配置
        MqttClientOptions options = new MqttClientOptions()
                .setClientId("mqtt-receive-client-" + System.currentTimeMillis())
                .setUsername("your-username") // 如果需要认证
                .setPassword("your-password") // 如果需要认证
                .setAutoKeepAlive(true)
                .setSsl(false) // 根据服务器配置设置
                .setKeepAliveInterval(30); // 保持连接间隔
        // 创建 MQTT 客户端
        var mqttClient = MqttClient.create(vertx, options);
        mqttClient.connect(1883, "your-mqtt-server-host")
                .onFailure(throwable -> {
                    log.error("Failed to connect to MQTT server", throwable);
                    startPromise.fail(throwable);
                })
                .onSuccess(ack -> {
                    log.info("MQTT client connected to server");
                    String topic = "your-topic-name"; // 替换为实际的主题
                    mqttClient.subscribe(topic, 1)
                            .onFailure(throwable -> {
                                log.error("Failed to subscribe to topic,cause:", throwable);
                                startPromise.fail(throwable);
                            })
                            .onSuccess(ack1 -> {
                                log.info("Subscribed to topic: %s".formatted(topic));
                                startPromise.complete();
                            });
                    mqttClient.publishHandler(message -> {
                        String topicName = message.topicName();
                        String payload = message.payload().toString();
                        log.info("Received message - Topic: %s, Payload: %s".formatted(topicName, payload));
                        processReceivedMessage(topicName, payload);
                    });
                    // 监听连接断开事件
                    mqttClient.disconnect(v -> {
                        log.warn("MQTT client disconnected from server");
                    });
                });
    }

    /**
     * 处理接收到的 MQTT 消息
     */
    private void processReceivedMessage(String topic, String payload) {

    }
}