Commit 67a28bb1e9aab71e119d357dc54d67c41380195c

Authored by zhangmeiyang
1 parent 16702faf

```

feat(mqtt): 使用Spring Integration替换Vertx实现MQTT功能

- 移除Vertx相关依赖和配置代码
- 添加Spring Integration MQTT集成模块
- 配置MQTT连接参数到application.properties
- 实现MQTT消息发送和接收服务
- 添加MQTT事件监听和处理机制
- 实现动态Topic订阅管理功能
- 更新项目依赖从mqtt-vertx到mqtt-integration
```
Showing 31 changed files with 685 additions and 308 deletions
mqtt-boot/pom.xml
@@ -19,7 +19,7 @@ @@ -19,7 +19,7 @@
19 </dependency> 19 </dependency>
20 <dependency> 20 <dependency>
21 <groupId>com.diligrp</groupId> 21 <groupId>com.diligrp</groupId>
22 - <artifactId>mqtt-vertx</artifactId> 22 + <artifactId>mqtt-integration</artifactId>
23 <version>${revision}</version> 23 <version>${revision}</version>
24 </dependency> 24 </dependency>
25 <dependency> 25 <dependency>
mqtt-boot/src/main/java/com/diligrp/mqtt/boot/MqttApplication.java
1 package com.diligrp.mqtt.boot; 1 package com.diligrp.mqtt.boot;
2 2
3 import com.diligrp.mqtt.core.CoreConfig; 3 import com.diligrp.mqtt.core.CoreConfig;
4 -import com.diligrp.mqtt.vertx.VertxConfig; 4 +import com.diligrp.mqtt.integration.IntegrationConfig;
5 import com.diligrp.mqtt.web.WebConfig; 5 import com.diligrp.mqtt.web.WebConfig;
6 import org.springframework.boot.SpringApplication; 6 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication; 7 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -15,7 +15,7 @@ import org.springframework.context.annotation.Import; @@ -15,7 +15,7 @@ import org.springframework.context.annotation.Import;
15 */ 15 */
16 @SpringBootApplication 16 @SpringBootApplication
17 @EnableDiscoveryClient 17 @EnableDiscoveryClient
18 -@Import({CoreConfig.class, VertxConfig.class, WebConfig.class}) 18 +@Import({CoreConfig.class, IntegrationConfig.class, WebConfig.class})
19 public class MqttApplication { 19 public class MqttApplication {
20 public static void main(String[] args) { 20 public static void main(String[] args) {
21 SpringApplication.run(MqttApplication.class, args); 21 SpringApplication.run(MqttApplication.class, args);
mqtt-boot/src/main/resources/application.properties
@@ -9,3 +9,13 @@ spring.cloud.nacos.config.server-addr=nacos.diligrp.com:8848 @@ -9,3 +9,13 @@ spring.cloud.nacos.config.server-addr=nacos.diligrp.com:8848
9 spring.cloud.nacos.config.namespace=2267e673-b41f-458d-9643-2a03e4fd92fb 9 spring.cloud.nacos.config.namespace=2267e673-b41f-458d-9643-2a03e4fd92fb
10 spring.config.import[0]=nacos:${spring.application.name}.properties 10 spring.config.import[0]=nacos:${spring.application.name}.properties
11 spring.config.import[1]=nacos:${spring.application.name}-${spring.profiles.active}.properties 11 spring.config.import[1]=nacos:${spring.application.name}-${spring.profiles.active}.properties
  12 +
  13 +
  14 +mqtt.url=tcp://10.30.110.178:11883
  15 +mqtt.clientId=mqtt-agent
  16 +mqtt.username=test_group
  17 +mqtt.password=1qaz2wsx
  18 +mqtt.timeOut=10
  19 +mqtt.cleanSession=true
  20 +mqtt.automaticReconnect=true
  21 +mqtt.qos=1
mqtt-core/pom.xml
@@ -16,15 +16,11 @@ @@ -16,15 +16,11 @@
16 <dependencies> 16 <dependencies>
17 <dependency> 17 <dependency>
18 <groupId>org.springframework.boot</groupId> 18 <groupId>org.springframework.boot</groupId>
19 - <artifactId>spring-boot-starter-json</artifactId> 19 + <artifactId>spring-boot-starter-web</artifactId>
20 </dependency> 20 </dependency>
21 <dependency> 21 <dependency>
22 - <groupId>io.vertx</groupId>  
23 - <artifactId>vertx-core</artifactId>  
24 - </dependency>  
25 - <dependency>  
26 - <groupId>io.vertx</groupId>  
27 - <artifactId>vertx-mqtt</artifactId> 22 + <groupId>org.projectlombok</groupId>
  23 + <artifactId>lombok</artifactId>
28 </dependency> 24 </dependency>
29 </dependencies> 25 </dependencies>
30 26
mqtt-core/src/main/java/com/diligrp/mqtt/core/CoreConfig.java
1 package com.diligrp.mqtt.core; 1 package com.diligrp.mqtt.core;
2 2
3 -import io.vertx.core.Vertx;  
4 -import org.springframework.context.annotation.Bean;  
5 import org.springframework.context.annotation.ComponentScan; 3 import org.springframework.context.annotation.ComponentScan;
6 import org.springframework.context.annotation.Configuration; 4 import org.springframework.context.annotation.Configuration;
7 5
@@ -13,8 +11,5 @@ import org.springframework.context.annotation.Configuration; @@ -13,8 +11,5 @@ import org.springframework.context.annotation.Configuration;
13 @Configuration 11 @Configuration
14 @ComponentScan(basePackages = "com.diligrp.mqtt.core") 12 @ComponentScan(basePackages = "com.diligrp.mqtt.core")
15 public class CoreConfig { 13 public class CoreConfig {
16 - @Bean  
17 - public Vertx vertx() {  
18 - return Vertx.vertx();  
19 - } 14 +
20 } 15 }
mqtt-core/src/main/java/com/diligrp/mqtt/core/constant/MqttTopicConstant.java deleted 100644 → 0
1 -package com.diligrp.mqtt.core.constant;  
2 -  
3 -/**  
4 - * @Author: zhangmeiyang  
5 - * @CreateTime: 2025-12-29 17:39  
6 - * @Version: todo  
7 - */  
8 -public class MqttTopicConstant {  
9 -  
10 - public static final String PRINTER_TOPIC = "mqtt.printer.send";  
11 -}  
mqtt-core/src/main/java/com/diligrp/mqtt/core/event/RecFailEvent.java 0 → 100644
  1 +package com.diligrp.mqtt.core.event;
  2 +
  3 +import lombok.Getter;
  4 +import lombok.Setter;
  5 +import org.springframework.context.ApplicationEvent;
  6 +
  7 +/**
  8 + * @Author: zhangmeiyang
  9 + * @CreateTime: 2026-01-14 17:05
  10 + * @Version: todo
  11 + */
  12 +@Getter
  13 +@Setter
  14 +public class RecFailEvent extends ApplicationEvent {
  15 + private String message;
  16 +
  17 + public RecFailEvent(Object source, String message) {
  18 + super(source);
  19 + this.message = message;
  20 + }
  21 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/event/RecSuccessEvent.java 0 → 100644
  1 +package com.diligrp.mqtt.core.event;
  2 +
  3 +import com.diligrp.mqtt.core.model.ReceiveModel;
  4 +import lombok.Getter;
  5 +import lombok.Setter;
  6 +import org.springframework.context.ApplicationEvent;
  7 +
  8 +/**
  9 + * @Author: zhangmeiyang
  10 + * @CreateTime: 2026-01-14 17:05
  11 + * @Version: todo
  12 + */
  13 +@Getter
  14 +@Setter
  15 +public class RecSuccessEvent extends ApplicationEvent {
  16 +
  17 + private ReceiveModel receiveModel;
  18 +
  19 + public RecSuccessEvent(Object source, ReceiveModel receiveModel) {
  20 + super(source);
  21 + this.receiveModel = receiveModel;
  22 + }
  23 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/event/SendFailEvent.java 0 → 100644
  1 +package com.diligrp.mqtt.core.event;
  2 +
  3 +import com.diligrp.mqtt.core.model.SendModel;
  4 +import lombok.Getter;
  5 +import lombok.Setter;
  6 +import org.springframework.context.ApplicationEvent;
  7 +
  8 +/**
  9 + * @Author: zhangmeiyang
  10 + * @CreateTime: 2026-01-14 17:04
  11 + * @Version: todo
  12 + */
  13 +@Getter
  14 +@Setter
  15 +public class SendFailEvent extends ApplicationEvent {
  16 +
  17 + private SendModel sendModel;
  18 +
  19 + public SendFailEvent(Object source, SendModel sendModel) {
  20 + super(source);
  21 + this.sendModel = sendModel;
  22 + }
  23 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/event/SendSuccessEvent.java 0 → 100644
  1 +package com.diligrp.mqtt.core.event;
  2 +
  3 +import com.diligrp.mqtt.core.model.SendModel;
  4 +import lombok.Getter;
  5 +import lombok.Setter;
  6 +import org.springframework.context.ApplicationEvent;
  7 +
  8 +/**
  9 + * @Author: zhangmeiyang
  10 + * @CreateTime: 2026-01-14 17:00
  11 + * @Version: todo
  12 + */
  13 +@Getter
  14 +@Setter
  15 +public class SendSuccessEvent extends ApplicationEvent {
  16 +
  17 + private SendModel sendModel;
  18 +
  19 + public SendSuccessEvent(Object source, SendModel sendModel) {
  20 + super(source);
  21 + this.sendModel = sendModel;
  22 + }
  23 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/event/listener/MqttListener.java 0 → 100644
  1 +package com.diligrp.mqtt.core.event.listener;
  2 +
  3 +import com.diligrp.mqtt.core.event.RecFailEvent;
  4 +import com.diligrp.mqtt.core.event.RecSuccessEvent;
  5 +import com.diligrp.mqtt.core.event.SendFailEvent;
  6 +import com.diligrp.mqtt.core.event.SendSuccessEvent;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.springframework.context.event.EventListener;
  9 +import org.springframework.stereotype.Component;
  10 +
  11 +/**
  12 + * @Author: zhangmeiyang
  13 + * @CreateTime: 2026-01-14 17:07
  14 + * @Version: todo
  15 + */
  16 +@Component
  17 +@Slf4j
  18 +public class MqttListener {
  19 +
  20 + @EventListener
  21 + public void onSendSuccess(SendSuccessEvent event) {
  22 + log.info("消息发送成功事件完成后置处理,{}", event.getSendModel());
  23 + }
  24 +
  25 + @EventListener
  26 + public void onSendFail(SendFailEvent event) {
  27 + log.info("消息发送失败事件完成后置处理,{}", event.getSendModel());
  28 + }
  29 +
  30 + @EventListener
  31 + public void onRecSuccess(RecSuccessEvent event) {
  32 + log.info("消息接收成功事件完成后置处理,{}", event.getReceiveModel());
  33 + }
  34 +
  35 + @EventListener
  36 + public void onRecFail(RecFailEvent event) {
  37 + log.info("消息接收失败事件完成后置处理,{}",event.getMessage());
  38 + }
  39 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/Printer.java deleted 100644 → 0
1 -package com.diligrp.mqtt.core.model;  
2 -  
3 -import io.netty.handler.codec.mqtt.MqttQoS;  
4 -  
5 -/**  
6 - * @Author: zhangmeiyang  
7 - * @CreateTime: 2025-12-29 17:44  
8 - * @Version: todo  
9 - */  
10 -public class Printer {  
11 - private byte[] data;  
12 - private String topic;  
13 - private String username;  
14 - private String password;  
15 - private String host;  
16 - private Integer port;  
17 - private MqttQoS qos;  
18 - private String successCallBackEventBus;  
19 - private String failCallBackEventBus;  
20 -  
21 - public byte[] getData() {  
22 - return data;  
23 - }  
24 -  
25 - public void setData(byte[] data) {  
26 - this.data = data;  
27 - }  
28 -  
29 - public String getTopic() {  
30 - return topic;  
31 - }  
32 -  
33 - public void setTopic(String topic) {  
34 - this.topic = topic;  
35 - }  
36 -  
37 - public String getUsername() {  
38 - return username;  
39 - }  
40 -  
41 - public void setUsername(String username) {  
42 - this.username = username;  
43 - }  
44 -  
45 - public String getPassword() {  
46 - return password;  
47 - }  
48 -  
49 - public void setPassword(String password) {  
50 - this.password = password;  
51 - }  
52 -  
53 - public String getHost() {  
54 - return host;  
55 - }  
56 -  
57 - public void setHost(String host) {  
58 - this.host = host;  
59 - }  
60 -  
61 - public Integer getPort() {  
62 - return port;  
63 - }  
64 -  
65 - public void setPort(Integer port) {  
66 - this.port = port;  
67 - }  
68 -  
69 - public MqttQoS getQos() {  
70 - return qos;  
71 - }  
72 -  
73 - public void setQos(MqttQoS qos) {  
74 - this.qos = qos;  
75 - }  
76 -  
77 - public String getSuccessCallBackEventBus() {  
78 - return successCallBackEventBus;  
79 - }  
80 -  
81 - public void setSuccessCallBackEventBus(String successCallBackEventBus) {  
82 - this.successCallBackEventBus = successCallBackEventBus;  
83 - }  
84 -  
85 - public String getFailCallBackEventBus() {  
86 - return failCallBackEventBus;  
87 - }  
88 -  
89 - public void setFailCallBackEventBus(String failCallBackEventBus) {  
90 - this.failCallBackEventBus = failCallBackEventBus;  
91 - }  
92 -}  
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/ReceiveModel.java 0 → 100644
  1 +package com.diligrp.mqtt.core.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.UUID;
  6 +
  7 +/**
  8 + * @Author: zhangmeiyang
  9 + * @CreateTime: 2026-01-14 16:23
  10 + * @Version: todo
  11 + */
  12 +@Data
  13 +public class ReceiveModel {
  14 + /**
  15 + * 身份证
  16 + */
  17 + private UUID id;
  18 + /**
  19 + * 主题
  20 + */
  21 + private String topic;
  22 + /**
  23 + * QoS(服务质量)
  24 + */
  25 + private Integer qos;
  26 + /**
  27 + * MQTT 编号
  28 + */
  29 + private Integer mqttId;
  30 + /**
  31 + * 保留
  32 + */
  33 + private Boolean retained;
  34 + /**
  35 + * 重复
  36 + */
  37 + private Boolean duplicate;
  38 + /**
  39 + * 信息
  40 + */
  41 + private String payload;
  42 + /**
  43 + * 时间戳
  44 + */
  45 + private Long timestamp;
  46 + /**
  47 + * 成功行动活动
  48 + */
  49 + private String successActionEvent;
  50 +
  51 + /**
  52 + * 失败动作事件
  53 + */
  54 + private String failActionEvent;
  55 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/SendModel.java 0 → 100644
  1 +package com.diligrp.mqtt.core.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.Map;
  6 +
  7 +/**
  8 + * @Author: zhangmeiyang
  9 + * @CreateTime: 2026-01-14 16:30
  10 + * @Version: todo
  11 + */
  12 +@Data
  13 +public class SendModel {
  14 + /**
  15 + * 主题
  16 + */
  17 + private String topic;
  18 + /**
  19 + * QoS(服务质量)
  20 + */
  21 + private Integer qos;
  22 + /**
  23 + * 保留
  24 + */
  25 + private Boolean retained;
  26 + /**
  27 + * 信息
  28 + */
  29 + private Map<String, Object> payload;
  30 + /**
  31 + * 成功行动活动
  32 + */
  33 + private String successActionEvent;
  34 + /**
  35 + * 失败动作事件
  36 + */
  37 + private String failActionEvent;
  38 +
  39 + public static SendModel withDefault() {
  40 + var model = new SendModel();
  41 + model.setQos(1);
  42 + model.setRetained(false);
  43 + return model;
  44 + }
  45 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/SubscribeModel.java 0 → 100644
  1 +package com.diligrp.mqtt.core.model;
  2 +
  3 +
  4 +import lombok.Data;
  5 +
  6 +/**
  7 + * @Author: zhangmeiyang
  8 + * @CreateTime: 2025-12-29 17:44
  9 + * @Version: todo
  10 + */
  11 +@Data
  12 +public class SubscribeModel {
  13 + private String topic;
  14 + private int qos;
  15 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/service/MqttMessageService.java 0 → 100644
  1 +package com.diligrp.mqtt.core.service;
  2 +
  3 +import com.diligrp.mqtt.core.model.SendModel;
  4 +
  5 +public interface MqttMessageService {
  6 +
  7 + /**
  8 + * 发送消息
  9 + *
  10 + * @param sendModel 发送模型
  11 + */
  12 + void sendMessage(SendModel sendModel);
  13 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/service/MqttTopicService.java 0 → 100644
  1 +package com.diligrp.mqtt.core.service;
  2 +
  3 +import java.util.HashSet;
  4 +
  5 +public interface MqttTopicService {
  6 +
  7 + /**
  8 + * 订阅主题
  9 + *
  10 + * @param topic 主题
  11 + * @param qos QoS(服务质量)
  12 + */
  13 + void subscribeTopic(String topic, int qos);
  14 +
  15 + /**
  16 + * 取消订阅主题
  17 + *
  18 + * @param topic 主题
  19 + */
  20 + void unsubscribeTopic(String topic);
  21 +
  22 + /**
  23 + * 获取订阅主题
  24 + *
  25 + * @return {@link HashSet }<{@link String }>
  26 + */
  27 + HashSet<String> getSubscribedTopics();
  28 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/RecFailEventType.java 0 → 100644
  1 +package com.diligrp.mqtt.core.type;
  2 +
  3 +public enum RecFailEventType {
  4 + PRINTER("printer");
  5 + public final String value;
  6 +
  7 + RecFailEventType(String value) {
  8 + this.value = value;
  9 + }
  10 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/RecSuccessEventType.java 0 → 100644
  1 +package com.diligrp.mqtt.core.type;
  2 +
  3 +/**
  4 + * @Author: zhangmeiyang
  5 + * @CreateTime: 2026-01-14 16:59
  6 + * @Version: todo
  7 + */
  8 +public enum RecSuccessEventType {
  9 + PRINTER("printer");
  10 + public final String value;
  11 +
  12 + RecSuccessEventType(String value) {
  13 + this.value = value;
  14 + }
  15 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/SendFailEventType.java 0 → 100644
  1 +package com.diligrp.mqtt.core.type;
  2 +
  3 +public enum SendFailEventType {
  4 +
  5 + PRINTER("printer");
  6 + public final String value;
  7 +
  8 + SendFailEventType(String value) {
  9 + this.value = value;
  10 + }
  11 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/SendSuccessEventType.java 0 → 100644
  1 +package com.diligrp.mqtt.core.type;
  2 +
  3 +public enum SendSuccessEventType {
  4 +
  5 + PRINTER("printer");
  6 + public final String value;
  7 +
  8 + SendSuccessEventType(String value) {
  9 + this.value = value;
  10 + }
  11 +}
mqtt-vertx/pom.xml renamed to mqtt-integration/pom.xml
@@ -4,7 +4,7 @@ @@ -4,7 +4,7 @@
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion> 5 <modelVersion>4.0.0</modelVersion>
6 6
7 - <artifactId>mqtt-vertx</artifactId> 7 + <artifactId>mqtt-integration</artifactId>
8 <version>${revision}</version> 8 <version>${revision}</version>
9 <packaging>jar</packaging> 9 <packaging>jar</packaging>
10 10
@@ -21,20 +21,16 @@ @@ -21,20 +21,16 @@
21 <version>${revision}</version> 21 <version>${revision}</version>
22 </dependency> 22 </dependency>
23 <dependency> 23 <dependency>
24 - <groupId>io.vertx</groupId>  
25 - <artifactId>vertx-core</artifactId> 24 + <groupId>org.springframework.boot</groupId>
  25 + <artifactId>spring-boot-starter-integration</artifactId>
26 </dependency> 26 </dependency>
27 <dependency> 27 <dependency>
28 - <groupId>io.vertx</groupId>  
29 - <artifactId>vertx-mqtt</artifactId> 28 + <groupId>org.springframework.integration</groupId>
  29 + <artifactId>spring-integration-mqtt</artifactId>
30 </dependency> 30 </dependency>
31 <dependency> 31 <dependency>
32 - <groupId>io.vertx</groupId>  
33 - <artifactId>vertx-redis-client</artifactId>  
34 - </dependency>  
35 - <dependency>  
36 - <groupId>io.vertx</groupId>  
37 - <artifactId>vertx-mongo-client</artifactId> 32 + <groupId>org.eclipse.paho</groupId>
  33 + <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
38 </dependency> 34 </dependency>
39 </dependencies> 35 </dependencies>
40 36
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/IntegrationConfig.java 0 → 100644
  1 +package com.diligrp.mqtt.integration;
  2 +
  3 +import lombok.Data;
  4 +import lombok.extern.slf4j.Slf4j;
  5 +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  6 +import org.springframework.beans.factory.annotation.Value;
  7 +import org.springframework.context.annotation.Bean;
  8 +import org.springframework.context.annotation.ComponentScan;
  9 +import org.springframework.context.annotation.Configuration;
  10 +import org.springframework.integration.annotation.ServiceActivator;
  11 +import org.springframework.integration.channel.DirectChannel;
  12 +import org.springframework.integration.config.EnableIntegration;
  13 +import org.springframework.integration.core.MessageProducer;
  14 +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  15 +import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  16 +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  17 +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  18 +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  19 +import org.springframework.messaging.MessageChannel;
  20 +import org.springframework.messaging.MessageHandler;
  21 +
  22 +/**
  23 + * @Author: zhangmeiyang
  24 + * @CreateTime: 2026-01-14 11:15
  25 + * @Version: todo
  26 + */
  27 +@Configuration
  28 +@ComponentScan(basePackages = "com.diligrp.mqtt.integration")
  29 +@Data
  30 +@Slf4j
  31 +@EnableIntegration
  32 +public class IntegrationConfig {
  33 + @Value("${mqtt.url}")
  34 + private String url;
  35 +
  36 + @Value("${mqtt.clientId}")
  37 + private String clientId;
  38 +
  39 + @Value("${mqtt.username}")
  40 + private String username;
  41 +
  42 + @Value("${mqtt.password}")
  43 + private String password;
  44 +
  45 + @Value("${mqtt.timeOut}")
  46 + private int timeOut;
  47 +
  48 + @Value("${mqtt.cleanSession}")
  49 + private boolean cleanSession;
  50 +
  51 + @Value("${mqtt.automaticReconnect}")
  52 + private boolean automaticReconnect;
  53 +
  54 + @Value("${mqtt.qos}")
  55 + private int qos;
  56 +
  57 +
  58 + /**
  59 + * MQTT 客户端工厂
  60 + *
  61 + * @return {@link MqttPahoClientFactory }
  62 + */
  63 + @Bean
  64 + public MqttPahoClientFactory mqttClientFactory() {
  65 + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  66 + MqttConnectOptions options = new MqttConnectOptions();
  67 + options.setServerURIs(new String[]{url});
  68 + options.setUserName(username);
  69 + options.setPassword(password.toCharArray());
  70 + options.setAutomaticReconnect(automaticReconnect);
  71 + options.setCleanSession(cleanSession);
  72 + options.setConnectionTimeout(timeOut);
  73 + factory.setConnectionOptions(options);
  74 + return factory;
  75 + }
  76 +
  77 + /**
  78 + * outbound通道
  79 + *
  80 + * @return {@link MessageChannel }
  81 + */
  82 + @Bean(name = "mqttOutboundChannel")
  83 + public MessageChannel mqttOutboundChannel() {
  84 + return new DirectChannel();
  85 + }
  86 +
  87 +
  88 + /**
  89 + * 消息发送处理器(支持回执)
  90 + */
  91 + @Bean
  92 + @ServiceActivator(inputChannel = "mqttOutboundChannel")
  93 + public MessageHandler mqttOutbound() {
  94 + MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-sender", mqttClientFactory());
  95 + handler.setDefaultQos(qos);
  96 + handler.setAsync(true);
  97 + handler.setCompletionTimeout(5000);
  98 + handler.setConverter(new DefaultPahoMessageConverter());
  99 + handler.setDefaultRetained(false);
  100 + log.info("Mqtt消息发送处理器初始化完成");
  101 + return handler;
  102 + }
  103 +
  104 + /**
  105 + * 接收消息的通道
  106 + */
  107 + @Bean(name = "mqttInboundChannel")
  108 + public MessageChannel mqttInboundChannel() {
  109 + return new DirectChannel();
  110 + }
  111 +
  112 + /**
  113 + * MQTT消息驱动适配器(用于接收消息)
  114 + */
  115 + @Bean(name = "mqttPahoMessageDrivenChannelAdapter")
  116 + public MqttPahoMessageDrivenChannelAdapter inbound() {
  117 + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-receiver", mqttClientFactory());
  118 + adapter.setConverter(new DefaultPahoMessageConverter());
  119 + adapter.setQos(qos);
  120 + adapter.setOutputChannel(mqttInboundChannel());
  121 + log.info("Mqtt消息接收适配器初始化完成");
  122 + return adapter;
  123 + }
  124 +}
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/handler/MqttMessageReceiver.java 0 → 100644
  1 +package com.diligrp.mqtt.integration.handler;
  2 +
  3 +import com.diligrp.mqtt.core.event.RecFailEvent;
  4 +import com.diligrp.mqtt.core.event.RecSuccessEvent;
  5 +import com.diligrp.mqtt.core.model.ReceiveModel;
  6 +import com.diligrp.mqtt.core.util.JsonUtils;
  7 +import jakarta.annotation.Resource;
  8 +import lombok.extern.slf4j.Slf4j;
  9 +import org.springframework.context.ApplicationEventPublisher;
  10 +import org.springframework.integration.annotation.ServiceActivator;
  11 +import org.springframework.messaging.Message;
  12 +import org.springframework.stereotype.Component;
  13 +
  14 +import java.util.UUID;
  15 +
  16 +/**
  17 + * MQTT消息接收处理器
  18 + */
  19 +@Slf4j
  20 +@Component
  21 +public class MqttMessageReceiver {
  22 +
  23 + private static final String RETAINED_HEADER = "mqtt_receivedRetained";
  24 + private static final String MQTT_ID_HEADER = "mqtt_id";
  25 + private static final String MQTT_DUPLICATE_HEADER = "mqtt_duplicate";
  26 + private static final String ID_HEADER = "id";
  27 + private static final String TOPIC_HEADER = "mqtt_receivedTopic";
  28 + private static final String QOS_HEADER = "mqtt_receivedQos";
  29 + private static final String TIMESTAMP_HEADER = "timestamp";
  30 + private static final String SUCCESS_EVENT_HEADER = "mqtt_success_event";
  31 + private static final String FAIL_EVENT_HEADER = "mqtt_fail_event";
  32 +
  33 + @Resource
  34 + private ApplicationEventPublisher applicationEventPublisher;
  35 +
  36 + /**
  37 + * 处理接收到的MQTT消息
  38 + */
  39 + @ServiceActivator(inputChannel = "mqttInboundChannel")
  40 + public void handleMqttMessage(Message<?> message) {
  41 + try {
  42 + String topic = message.getHeaders().get(TOPIC_HEADER, String.class);
  43 + Integer qos = message.getHeaders().get(QOS_HEADER, Integer.class);
  44 + Boolean retained = message.getHeaders().get(RETAINED_HEADER, Boolean.class);
  45 + UUID id = message.getHeaders().get(ID_HEADER, UUID.class);
  46 + Long timestamp = message.getHeaders().get(TIMESTAMP_HEADER, Long.class);
  47 + Integer mqttId = message.getHeaders().get(MQTT_ID_HEADER, Integer.class);
  48 + Boolean duplicate = message.getHeaders().get(MQTT_DUPLICATE_HEADER, Boolean.class);
  49 + String successEvent = message.getHeaders().get(SUCCESS_EVENT_HEADER, String.class);
  50 + String failEvent = message.getHeaders().get(FAIL_EVENT_HEADER, String.class);
  51 + String payload = message.getPayload().toString();
  52 + var receiveModel = new ReceiveModel();
  53 + receiveModel.setId(id);
  54 + receiveModel.setTopic(topic);
  55 + receiveModel.setQos(qos);
  56 + receiveModel.setMqttId(mqttId);
  57 + receiveModel.setRetained(retained);
  58 + receiveModel.setDuplicate(duplicate);
  59 + receiveModel.setPayload(payload);
  60 + receiveModel.setTimestamp(timestamp);
  61 + receiveModel.setSuccessActionEvent(successEvent);
  62 + receiveModel.setFailActionEvent(failEvent);
  63 + log.info("收到MQTT消息 - :{}", receiveModel);
  64 + applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel));
  65 + } catch (Exception e) {
  66 + log.error("处理MQTT消息失败", e);
  67 + applicationEventPublisher.publishEvent(new RecFailEvent(this, JsonUtils.toJsonString(message)));
  68 +
  69 + }
  70 + }
  71 +}
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/handler/MqttMessageSender.java 0 → 100644
  1 +package com.diligrp.mqtt.integration.handler;
  2 +
  3 +import com.diligrp.mqtt.core.event.SendFailEvent;
  4 +import com.diligrp.mqtt.core.event.SendSuccessEvent;
  5 +import com.diligrp.mqtt.core.model.SendModel;
  6 +import com.diligrp.mqtt.core.service.MqttMessageService;
  7 +import com.diligrp.mqtt.core.util.JsonUtils;
  8 +import jakarta.annotation.Resource;
  9 +import lombok.extern.slf4j.Slf4j;
  10 +import org.springframework.context.ApplicationEventPublisher;
  11 +import org.springframework.integration.support.MessageBuilder;
  12 +import org.springframework.messaging.MessageChannel;
  13 +import org.springframework.messaging.MessageHeaders;
  14 +import org.springframework.stereotype.Service;
  15 +import org.springframework.util.MimeTypeUtils;
  16 +
  17 +import java.util.Optional;
  18 +
  19 +/**
  20 + * MQTT消息发送服务
  21 + */
  22 +@Slf4j
  23 +@Service
  24 +public class MqttMessageSender implements MqttMessageService {
  25 +
  26 + public static final String MQTT_TOPIC = "mqtt_topic";
  27 + public static final String MQTT_QOS = "mqtt_qos";
  28 + public static final String MQTT_RETAINED = "mqtt_retained";
  29 + @Resource
  30 + private MessageChannel mqttOutboundChannel;
  31 +
  32 + @Resource
  33 + private ApplicationEventPublisher applicationEventPublisher;
  34 +
  35 + @Override
  36 + public void sendMessage(SendModel sendModel) {
  37 + try {
  38 + String payload = JsonUtils.toJsonString(sendModel.getPayload());
  39 + var message = MessageBuilder.withPayload(payload)
  40 + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
  41 + .setHeader(MQTT_TOPIC, sendModel.getTopic())
  42 + .setHeader(MQTT_QOS, sendModel.getQos())
  43 + .setHeader(MQTT_RETAINED, sendModel.getRetained())
  44 + .build();
  45 + boolean result = mqttOutboundChannel.send(message);
  46 + Optional.of(result).filter(e -> e).ifPresentOrElse(send -> {
  47 + log.info("消息发送成功 - :{}", message);
  48 + applicationEventPublisher.publishEvent(new SendSuccessEvent(this, sendModel));
  49 + }, () -> {
  50 + log.error("消息发送失败 - :{}", message);
  51 + applicationEventPublisher.publishEvent(new SendFailEvent(this, sendModel));
  52 + });
  53 + } catch (Exception e) {
  54 + log.error("发送Mqtt消息失败", e);
  55 + applicationEventPublisher.publishEvent(new SendFailEvent(this, sendModel));
  56 + }
  57 + }
  58 +}
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/manager/MqttTopicManager.java 0 → 100644
  1 +package com.diligrp.mqtt.integration.manager;
  2 +
  3 +import com.diligrp.mqtt.core.service.MqttTopicService;
  4 +import jakarta.annotation.Resource;
  5 +import lombok.extern.slf4j.Slf4j;
  6 +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  7 +import org.springframework.stereotype.Service;
  8 +
  9 +import java.util.Arrays;
  10 +import java.util.HashSet;
  11 +import java.util.Optional;
  12 +
  13 +
  14 +/**
  15 + * MQTT动态Topic管理服务
  16 + */
  17 +@Slf4j
  18 +@Service
  19 +public class MqttTopicManager implements MqttTopicService {
  20 +
  21 + @Resource
  22 + private MqttPahoMessageDrivenChannelAdapter mqttMessageDrivenChannelAdapter;
  23 +
  24 + /**
  25 + * 动态订阅Topic
  26 + *
  27 + * @param topic 要订阅的Topic
  28 + * @param qos QoS级别
  29 + */
  30 + @Override
  31 + public void subscribeTopic(String topic, int qos) {
  32 + Optional.ofNullable(topic).filter(t -> !isTopicSubscribed(t)).ifPresentOrElse(t -> {
  33 + mqttMessageDrivenChannelAdapter.addTopic(t, qos);
  34 + log.info("成功订阅Topic: {}, QoS: {}", t, qos);
  35 + }, () -> log.info("无效订阅Topic: {}, QoS: {}", topic, qos));
  36 + }
  37 +
  38 + /**
  39 + * 取消订阅Topic
  40 + *
  41 + * @param topic 要取消的Topic
  42 + */
  43 + @Override
  44 + public void unsubscribeTopic(String topic) {
  45 + Optional.ofNullable(topic).filter(this::isTopicSubscribed).ifPresentOrElse(t -> {
  46 + mqttMessageDrivenChannelAdapter.removeTopic(t);
  47 + log.info("成功取消订阅Topic: {}", topic);
  48 + }, () -> log.info("无效取消订阅Topic: {}", topic));
  49 + }
  50 +
  51 + /**
  52 + * 检查Topic是否已订阅
  53 + *
  54 + * @param topic 要检查的Topic
  55 + * @return 是否订阅
  56 + */
  57 + public boolean isTopicSubscribed(String topic) {
  58 + return Optional.ofNullable(mqttMessageDrivenChannelAdapter.getTopic()).filter(t -> t.length > 0).map(ts -> new HashSet<>(Arrays.asList(ts))).orElseGet(HashSet::new).contains(topic);
  59 + }
  60 +
  61 + /**
  62 + * 获取所有已订阅的Topic
  63 + *
  64 + * @return 订阅的Topic数组
  65 + */
  66 + @Override
  67 + public HashSet<String> getSubscribedTopics() {
  68 + return Optional.ofNullable(mqttMessageDrivenChannelAdapter.getTopic()).filter(t -> t.length > 0).map(ts -> new HashSet<>(Arrays.asList(ts))).orElseGet(HashSet::new);
  69 + }
  70 +
  71 +}
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/VertxConfig.java deleted 100644 → 0
1 -package com.diligrp.mqtt.vertx;  
2 -  
3 -import org.springframework.context.annotation.ComponentScan;  
4 -import org.springframework.context.annotation.Configuration;  
5 -  
6 -/**  
7 - * @Author: zhangmeiyang  
8 - * @CreateTime: 2025-12-26 17:03  
9 - * @Version: todo  
10 - */  
11 -@Configuration  
12 -@ComponentScan(basePackages = "com.diligrp.mqtt.vertx")  
13 -public class VertxConfig {  
14 -}  
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/config/VerticleLifeCycle.java deleted 100644 → 0
1 -package com.diligrp.mqtt.vertx.config;  
2 -  
3 -import com.diligrp.mqtt.vertx.verticle.MqttSendVerticle;  
4 -import io.vertx.core.Vertx;  
5 -import jakarta.annotation.Resource;  
6 -import org.springframework.beans.factory.DisposableBean;  
7 -import org.springframework.boot.CommandLineRunner;  
8 -import org.springframework.stereotype.Component;  
9 -  
10 -/**  
11 - * verticle生命周期  
12 - *  
13 - * @author zhangmeiyang  
14 - * @date 2025/12/30  
15 - */  
16 -@Component  
17 -public class VerticleLifeCycle implements CommandLineRunner, DisposableBean {  
18 -  
19 - @Resource  
20 - private Vertx vertx;  
21 -  
22 - @Override  
23 - public void run(String... args) throws Exception {  
24 - vertx.deployVerticle(new MqttSendVerticle());  
25 - }  
26 -  
27 - @Override  
28 - public void destroy() throws Exception {  
29 - vertx.close();  
30 - }  
31 -}  
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/verticle/MqttReceiveVerticle.java deleted 100644 → 0
1 -package com.diligrp.mqtt.vertx.verticle;  
2 -  
3 -import io.vertx.core.AbstractVerticle;  
4 -import io.vertx.core.Promise;  
5 -import io.vertx.core.impl.logging.Logger;  
6 -import io.vertx.core.impl.logging.LoggerFactory;  
7 -import io.vertx.mqtt.MqttClient;  
8 -import io.vertx.mqtt.MqttClientOptions;  
9 -  
10 -/**  
11 - * @Author: zhangmeiyang  
12 - * @CreateTime: 2025-12-30 18:01  
13 - * @Version: todo  
14 - */  
15 -public class MqttReceiveVerticle extends AbstractVerticle {  
16 -  
17 - private static final Logger log = LoggerFactory.getLogger(MqttReceiveVerticle.class);  
18 -  
19 - @Override  
20 - public void start(Promise<Void> startPromise) throws Exception {  
21 - // MQTT 服务器连接配置  
22 - MqttClientOptions options = new MqttClientOptions()  
23 - .setClientId("mqtt-receive-client-" + System.currentTimeMillis())  
24 - .setUsername("your-username") // 如果需要认证  
25 - .setPassword("your-password") // 如果需要认证  
26 - .setAutoKeepAlive(true)  
27 - .setSsl(false) // 根据服务器配置设置  
28 - .setKeepAliveInterval(30); // 保持连接间隔  
29 - // 创建 MQTT 客户端  
30 - var mqttClient = MqttClient.create(vertx, options);  
31 - mqttClient.connect(1883, "your-mqtt-server-host")  
32 - .onFailure(throwable -> {  
33 - log.error("Failed to connect to MQTT server", throwable);  
34 - startPromise.fail(throwable);  
35 - })  
36 - .onSuccess(ack -> {  
37 - log.info("MQTT client connected to server");  
38 - String topic = "your-topic-name"; // 替换为实际的主题  
39 - mqttClient.subscribe(topic, 1)  
40 - .onFailure(throwable -> {  
41 - log.error("Failed to subscribe to topic,cause:", throwable);  
42 - startPromise.fail(throwable);  
43 - })  
44 - .onSuccess(ack1 -> {  
45 - log.info("Subscribed to topic: %s".formatted(topic));  
46 - startPromise.complete();  
47 - });  
48 - mqttClient.publishHandler(message -> {  
49 - String topicName = message.topicName();  
50 - String payload = message.payload().toString();  
51 - log.info("Received message - Topic: %s, Payload: %s".formatted(topicName, payload));  
52 - processReceivedMessage(topicName, payload);  
53 - });  
54 - // 监听连接断开事件  
55 - mqttClient.disconnect(v -> {  
56 - log.warn("MQTT client disconnected from server");  
57 - });  
58 - });  
59 - }  
60 -  
61 - /**  
62 - * 处理接收到的 MQTT 消息  
63 - */  
64 - private void processReceivedMessage(String topic, String payload) {  
65 -  
66 - }  
67 -}  
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/verticle/MqttSendVerticle.java deleted 100644 → 0
1 -package com.diligrp.mqtt.vertx.verticle;  
2 -  
3 -import com.diligrp.mqtt.core.model.Printer;  
4 -import com.diligrp.mqtt.core.util.JsonUtils;  
5 -import io.vertx.core.AbstractVerticle;  
6 -import io.vertx.core.Promise;  
7 -import io.vertx.core.buffer.Buffer;  
8 -import io.vertx.core.eventbus.EventBus;  
9 -import io.vertx.core.impl.logging.Logger;  
10 -import io.vertx.core.impl.logging.LoggerFactory;  
11 -import io.vertx.mqtt.MqttClient;  
12 -import io.vertx.mqtt.MqttClientOptions;  
13 -  
14 -import static com.diligrp.mqtt.core.constant.MqttTopicConstant.PRINTER_TOPIC;  
15 -  
16 -/**  
17 - * @Author: zhangmeiyang  
18 - * @CreateTime: 2025-12-26 17:05  
19 - * @Version: todo  
20 - */  
21 -public class MqttSendVerticle extends AbstractVerticle {  
22 -  
23 - private static final Logger LOGGER = LoggerFactory.getLogger(MqttSendVerticle.class);  
24 -  
25 -  
26 - @Override  
27 - public void start(Promise<Void> startPromise) throws Exception {  
28 - EventBus eventBus = vertx.eventBus();  
29 - eventBus.consumer(PRINTER_TOPIC, message -> {  
30 - String body = message.body().toString();  
31 - Printer printer = JsonUtils.fromJsonString(body, Printer.class);  
32 - MqttClientOptions options = new MqttClientOptions();  
33 - options.setUsername(printer.getUsername());  
34 - options.setPassword(printer.getPassword());  
35 - MqttClient client = MqttClient.create(vertx, options);  
36 - doSend(client, printer, eventBus);  
37 - });  
38 - }  
39 -  
40 - private void doSend(MqttClient client, Printer printer, EventBus eventBus) {  
41 - client.connect(printer.getPort(), printer.getHost())  
42 - .onFailure(throwable -> {  
43 - LOGGER.error("连接到[MQTT-SERVER]失败", throwable);  
44 - eventBus.send(PRINTER_TOPIC, JsonUtils.toJsonString(printer));  
45 - })  
46 - .onSuccess(connResult -> {  
47 - client.publish(printer.getTopic(), Buffer.buffer(printer.getData()), printer.getQos(), false, false)  
48 - .onFailure(throwable -> {  
49 - LOGGER.error("MQTT消息发送失败", throwable);  
50 - eventBus.send(printer.getFailCallBackEventBus(), JsonUtils.toJsonString(printer));  
51 - })  
52 - .onSuccess(pubResult -> {  
53 - LOGGER.info("MQTT消息发送成功");  
54 - eventBus.send(printer.getSuccessCallBackEventBus(), JsonUtils.toJsonString(printer));  
55 - });  
56 - client.disconnect();  
57 - });  
58 - }  
59 -}  
@@ -11,7 +11,7 @@ @@ -11,7 +11,7 @@
11 <modules> 11 <modules>
12 <module>mqtt-boot</module> 12 <module>mqtt-boot</module>
13 <module>mqtt-core</module> 13 <module>mqtt-core</module>
14 - <module>mqtt-vertx</module> 14 + <module>mqtt-integration</module>
15 <module>mqtt-web</module> 15 <module>mqtt-web</module>
16 </modules> 16 </modules>
17 17
@@ -31,7 +31,7 @@ @@ -31,7 +31,7 @@
31 <mybatis-plus.version>3.5.14</mybatis-plus.version> 31 <mybatis-plus.version>3.5.14</mybatis-plus.version>
32 <!-- 工具类库 --> 32 <!-- 工具类库 -->
33 <lombok.version>1.18.42</lombok.version> 33 <lombok.version>1.18.42</lombok.version>
34 - <vertx.version>4.5.22</vertx.version> 34 + <mqtt.version>1.2.5</mqtt.version>
35 </properties> 35 </properties>
36 36
37 <dependencyManagement> 37 <dependencyManagement>
@@ -73,11 +73,9 @@ @@ -73,11 +73,9 @@
73 <version>${mysql-connector.version}</version> 73 <version>${mysql-connector.version}</version>
74 </dependency> 74 </dependency>
75 <dependency> 75 <dependency>
76 - <groupId>io.vertx</groupId>  
77 - <artifactId>vertx-stack-depchain</artifactId>  
78 - <version>${vertx.version}</version>  
79 - <type>pom</type>  
80 - <scope>import</scope> 76 + <groupId>org.eclipse.paho</groupId>
  77 + <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  78 + <version>${mqtt.version}</version>
81 </dependency> 79 </dependency>
82 </dependencies> 80 </dependencies>
83 </dependencyManagement> 81 </dependencyManagement>