Commit 81a45064f9f1b3b04ff919337206ece662f62fee

Authored by zhangmeiyang
1 parent 152d3a4e

```

feat(mqtt): 集成Redis和MongoDB客户端并优化MQTT连接逻辑

- 移除application.properties中硬编码的Redis配置
- 在mqtt-vertx模块中添加Redis和MongoDB依赖
- 在mqtt-web模块中添加Redis和MongoDB Spring Boot Starter
- 重构MqttVerticle中的连接和发布逻辑,使用响应式API处理连接和消息发送
- 优化错误处理机制,添加连接失败和消息发送失败的回调
- 将VerticleLifeCycle中的Vertx注入方式改为@Resource注解
- 调整JsonUtils导入顺序
```
mqtt-boot/src/main/resources/application.properties
... ... @@ -9,12 +9,3 @@ spring.cloud.nacos.config.server-addr=nacos.diligrp.com:8848
9 9 spring.cloud.nacos.config.namespace=2267e673-b41f-458d-9643-2a03e4fd92fb
10 10 spring.config.import[0]=nacos:${spring.application.name}.properties
11 11 spring.config.import[1]=nacos:${spring.application.name}-${spring.profiles.active}.properties
12   -
13   -
14   -spring.data.redis.host=redis.diligrp.com
15   -spring.data.redis.port=6379
16   -spring.data.redis.database=12
17   -#spring.data.redis.username=
18   -#spring.data.redis.password=
19   -spring.data.redis.connect-timeout=15000
20   -spring.data.redis.timeout=30000
... ...
mqtt-vertx/pom.xml
... ... @@ -28,6 +28,14 @@
28 28 <groupId>io.vertx</groupId>
29 29 <artifactId>vertx-mqtt</artifactId>
30 30 </dependency>
  31 + <dependency>
  32 + <groupId>io.vertx</groupId>
  33 + <artifactId>vertx-redis-client</artifactId>
  34 + </dependency>
  35 + <dependency>
  36 + <groupId>io.vertx</groupId>
  37 + <artifactId>vertx-mongo-client</artifactId>
  38 + </dependency>
31 39 </dependencies>
32 40  
33 41 </project>
... ...
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/config/VerticleLifeCycle.java
... ... @@ -2,7 +2,12 @@ package com.diligrp.mqtt.vertx.config;
2 2  
3 3 import com.diligrp.mqtt.vertx.verticle.MqttVerticle;
4 4 import io.vertx.core.Vertx;
  5 +import io.vertx.core.json.JsonObject;
  6 +import io.vertx.ext.mongo.MongoClient;
  7 +import io.vertx.redis.client.Redis;
  8 +import jakarta.annotation.Resource;
5 9 import org.springframework.beans.factory.DisposableBean;
  10 +import org.springframework.beans.factory.annotation.Value;
6 11 import org.springframework.boot.CommandLineRunner;
7 12 import org.springframework.stereotype.Component;
8 13  
... ... @@ -15,11 +20,8 @@ import org.springframework.stereotype.Component;
15 20 @Component
16 21 public class VerticleLifeCycle implements CommandLineRunner, DisposableBean {
17 22  
18   - private final Vertx vertx;
19   -
20   - public VerticleLifeCycle(Vertx vertx) {
21   - this.vertx = vertx;
22   - }
  23 + @Resource
  24 + private Vertx vertx;
23 25  
24 26 @Override
25 27 public void run(String... args) throws Exception {
... ...
mqtt-vertx/src/main/java/com/diligrp/mqtt/vertx/verticle/MqttVerticle.java
1 1 package com.diligrp.mqtt.vertx.verticle;
2 2  
3   -import com.diligrp.mqtt.core.util.JsonUtils;
4 3 import com.diligrp.mqtt.core.model.Printer;
  4 +import com.diligrp.mqtt.core.util.JsonUtils;
5 5 import io.vertx.core.AbstractVerticle;
6 6 import io.vertx.core.Promise;
7 7 import io.vertx.core.buffer.Buffer;
... ... @@ -37,23 +37,23 @@ public class MqttVerticle extends AbstractVerticle {
37 37 });
38 38 }
39 39  
40   - private static void doSend(MqttClient client, Printer printer, EventBus eventBus) {
41   - client.connect(printer.getPort(), printer.getHost(), connResult -> {
42   - if (connResult.succeeded()) {
43   - // 连接成功后发布消息
44   - client.publish(printer.getTopic(), Buffer.buffer(printer.getData()), printer.getQos(), false, false, pubResult -> {
45   - if (pubResult.succeeded()) {
46   - LOGGER.info("Message published");
47   - eventBus.send(printer.getSuccessCallBackEventBus(), JsonUtils.toJsonString(printer));
48   - } else {
49   - LOGGER.error("Failed to publish message", pubResult.cause());
50   - eventBus.send(printer.getFailCallBackEventBus(), JsonUtils.toJsonString(printer));
51   - }
  40 + private void doSend(MqttClient client, Printer printer, EventBus eventBus) {
  41 + client.connect(printer.getPort(), printer.getHost())
  42 + .onFailure(throwable -> {
  43 + LOGGER.error("连接到[MQTT-SERVER]失败", throwable);
  44 + eventBus.send(PRINTER_TOPIC, JsonUtils.toJsonString(printer));
  45 + })
  46 + .onSuccess(connResult -> {
  47 + client.publish(printer.getTopic(), Buffer.buffer(printer.getData()), printer.getQos(), false, false)
  48 + .onFailure(throwable -> {
  49 + LOGGER.error("MQTT消息发送失败", throwable);
  50 + eventBus.send(printer.getFailCallBackEventBus(), JsonUtils.toJsonString(printer));
  51 + })
  52 + .onSuccess(pubResult -> {
  53 + LOGGER.info("MQTT消息发送成功");
  54 + eventBus.send(printer.getSuccessCallBackEventBus(), JsonUtils.toJsonString(printer));
  55 + });
52 56 client.disconnect();
53 57 });
54   - } else {
55   - LOGGER.error("Failed to connect to MQTT server", connResult.cause());
56   - }
57   - });
58 58 }
59 59 }
... ...
mqtt-web/pom.xml
... ... @@ -28,6 +28,14 @@
28 28 <groupId>org.springframework.boot</groupId>
29 29 <artifactId>spring-boot-starter-validation</artifactId>
30 30 </dependency>
  31 + <dependency>
  32 + <groupId>org.springframework.boot</groupId>
  33 + <artifactId>spring-boot-starter-data-mongodb</artifactId>
  34 + </dependency>
  35 + <dependency>
  36 + <groupId>org.springframework.boot</groupId>
  37 + <artifactId>spring-boot-starter-data-redis</artifactId>
  38 + </dependency>
31 39 </dependencies>
32 40  
33 41 </project>
... ...