Commit 16702faf76558c501731e8a798d580f78dc6c9a2

Authored by zhangmeiyang
1 parent 81a45064

feat(mqtt): 添加 MQTT 接收端和发送端功能

- 创建 MqttReceiveVerticle 类用于接收 MQTT 消息
- 重命名 MqttVerticle 为 MqttSendVerticle 用于发送 MQTT 消息
- 修改打印机主题常量从 mqtt.printer 为 mqtt.printer.send
- 更新 VerticleLifeCycle 配置以部署 MqttSendVerticle
- 实现 MQTT 客户端连接、订阅和消息处理功能
mqtt-core/src/main/java/com/diligrp/mqtt/core/constant/MqttTopicConstant.java
@@ -7,5 +7,5 @@ package com.diligrp.mqtt.core.constant; @@ -7,5 +7,5 @@ package com.diligrp.mqtt.core.constant;
7 */ 7 */
8 public class MqttTopicConstant { 8 public class MqttTopicConstant {
9 9
10 - public static final String PRINTER_TOPIC = "mqtt.printer"; 10 + public static final String PRINTER_TOPIC = "mqtt.printer.send";
11 } 11 }
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/config/VerticleLifeCycle.java
1 package com.diligrp.mqtt.vertx.config; 1 package com.diligrp.mqtt.vertx.config;
2 2
3 -import com.diligrp.mqtt.vertx.verticle.MqttVerticle; 3 +import com.diligrp.mqtt.vertx.verticle.MqttSendVerticle;
4 import io.vertx.core.Vertx; 4 import io.vertx.core.Vertx;
5 -import io.vertx.core.json.JsonObject;  
6 -import io.vertx.ext.mongo.MongoClient;  
7 -import io.vertx.redis.client.Redis;  
8 import jakarta.annotation.Resource; 5 import jakarta.annotation.Resource;
9 import org.springframework.beans.factory.DisposableBean; 6 import org.springframework.beans.factory.DisposableBean;
10 -import org.springframework.beans.factory.annotation.Value;  
11 import org.springframework.boot.CommandLineRunner; 7 import org.springframework.boot.CommandLineRunner;
12 import org.springframework.stereotype.Component; 8 import org.springframework.stereotype.Component;
13 9
@@ -25,7 +21,7 @@ public class VerticleLifeCycle implements CommandLineRunner, DisposableBean { @@ -25,7 +21,7 @@ public class VerticleLifeCycle implements CommandLineRunner, DisposableBean {
25 21
26 @Override 22 @Override
27 public void run(String... args) throws Exception { 23 public void run(String... args) throws Exception {
28 - vertx.deployVerticle(new MqttVerticle()); 24 + vertx.deployVerticle(new MqttSendVerticle());
29 } 25 }
30 26
31 @Override 27 @Override
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/verticle/MqttReceiveVerticle.java 0 → 100644
  1 +package com.diligrp.mqtt.vertx.verticle;
  2 +
  3 +import io.vertx.core.AbstractVerticle;
  4 +import io.vertx.core.Promise;
  5 +import io.vertx.core.impl.logging.Logger;
  6 +import io.vertx.core.impl.logging.LoggerFactory;
  7 +import io.vertx.mqtt.MqttClient;
  8 +import io.vertx.mqtt.MqttClientOptions;
  9 +
  10 +/**
  11 + * @Author: zhangmeiyang
  12 + * @CreateTime: 2025-12-30 18:01
  13 + * @Version: todo
  14 + */
  15 +public class MqttReceiveVerticle extends AbstractVerticle {
  16 +
  17 + private static final Logger log = LoggerFactory.getLogger(MqttReceiveVerticle.class);
  18 +
  19 + @Override
  20 + public void start(Promise<Void> startPromise) throws Exception {
  21 + // MQTT 服务器连接配置
  22 + MqttClientOptions options = new MqttClientOptions()
  23 + .setClientId("mqtt-receive-client-" + System.currentTimeMillis())
  24 + .setUsername("your-username") // 如果需要认证
  25 + .setPassword("your-password") // 如果需要认证
  26 + .setAutoKeepAlive(true)
  27 + .setSsl(false) // 根据服务器配置设置
  28 + .setKeepAliveInterval(30); // 保持连接间隔
  29 + // 创建 MQTT 客户端
  30 + var mqttClient = MqttClient.create(vertx, options);
  31 + mqttClient.connect(1883, "your-mqtt-server-host")
  32 + .onFailure(throwable -> {
  33 + log.error("Failed to connect to MQTT server", throwable);
  34 + startPromise.fail(throwable);
  35 + })
  36 + .onSuccess(ack -> {
  37 + log.info("MQTT client connected to server");
  38 + String topic = "your-topic-name"; // 替换为实际的主题
  39 + mqttClient.subscribe(topic, 1)
  40 + .onFailure(throwable -> {
  41 + log.error("Failed to subscribe to topic,cause:", throwable);
  42 + startPromise.fail(throwable);
  43 + })
  44 + .onSuccess(ack1 -> {
  45 + log.info("Subscribed to topic: %s".formatted(topic));
  46 + startPromise.complete();
  47 + });
  48 + mqttClient.publishHandler(message -> {
  49 + String topicName = message.topicName();
  50 + String payload = message.payload().toString();
  51 + log.info("Received message - Topic: %s, Payload: %s".formatted(topicName, payload));
  52 + processReceivedMessage(topicName, payload);
  53 + });
  54 + // 监听连接断开事件
  55 + mqttClient.disconnect(v -> {
  56 + log.warn("MQTT client disconnected from server");
  57 + });
  58 + });
  59 + }
  60 +
  61 + /**
  62 + * 处理接收到的 MQTT 消息
  63 + */
  64 + private void processReceivedMessage(String topic, String payload) {
  65 +
  66 + }
  67 +}
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/verticle/MqttVerticle.java renamed to mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/verticle/MqttSendVerticle.java
@@ -18,9 +18,9 @@ import static com.diligrp.mqtt.core.constant.MqttTopicConstant.PRINTER_TOPIC; @@ -18,9 +18,9 @@ import static com.diligrp.mqtt.core.constant.MqttTopicConstant.PRINTER_TOPIC;
18 * @CreateTime: 2025-12-26 17:05 18 * @CreateTime: 2025-12-26 17:05
19 * @Version: todo 19 * @Version: todo
20 */ 20 */
21 -public class MqttVerticle extends AbstractVerticle { 21 +public class MqttSendVerticle extends AbstractVerticle {
22 22
23 - private static final Logger LOGGER = LoggerFactory.getLogger(MqttVerticle.class); 23 + private static final Logger LOGGER = LoggerFactory.getLogger(MqttSendVerticle.class);
24 24
25 25
26 @Override 26 @Override