Commit 2e676e7c048e5d545ce2aed1deb5ccc2fe61ab1d

Authored by zhangmeiyang
1 parent 49ba958f

```

feat(mqtt-core): 新增字符集类型枚举并重构发送事件类型

新增 CharsetType 枚举用于处理不同字符集的字节转换,
重命名 SendSuccessEventType 为 SendEventType 并添加 of 方法,
移除不再使用的 SendFailEventType 枚举

feat(mqtt-integration): 增强消息发送器验证和日志记录功能

在 MqttMessageSender 中添加参数校验逻辑,包括消息结构体、主题、
内容、QOS 编码、保留标志、字符集和发送事件类型的非空检查,
修改消息发送时使用字符集进行字节转换,优化日志输出格式

refactor(mqtt-integration): 移除接收模型中的事件字段

从 ReceiveModel 中移除 successActionEvent 和 failActionEvent 字段,
更新 MqttMessageReceiver 中的消息处理逻辑

feat(mqtt-core): 添加订阅模型和主题服务接口方法

新增 SubscribeModel 数据模型,扩展 MqttTopicService 接口以支持
通过订阅模型进行主题订阅

feat(mqtt-integration): 实现基于订阅模型的主题订阅功能

在 MqttTopicManager 中实现基于 SubscribeModel 的主题订阅方法
```
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/ReceiveModel.java
@@ -43,13 +43,4 @@ public class ReceiveModel { @@ -43,13 +43,4 @@ public class ReceiveModel {
43 * 时间戳 43 * 时间戳
44 */ 44 */
45 private Long timestamp; 45 private Long timestamp;
46 - /**  
47 - * 成功行动活动  
48 - */  
49 - private String successActionEvent;  
50 -  
51 - /**  
52 - * 失败动作事件  
53 - */  
54 - private String failActionEvent;  
55 } 46 }
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/SendModel.java
@@ -28,13 +28,13 @@ public class SendModel { @@ -28,13 +28,13 @@ public class SendModel {
28 */ 28 */
29 private Map<String, Object> payload; 29 private Map<String, Object> payload;
30 /** 30 /**
31 - * 成功行动活动 31 + * 发送成功事件类型
32 */ 32 */
33 - private String successActionEvent; 33 + private String sendEventType;
34 /** 34 /**
35 - * 失败动作事件 35 + * 字符集类型
36 */ 36 */
37 - private String failActionEvent; 37 + private String charsetType;
38 38
39 public static SendModel withDefault() { 39 public static SendModel withDefault() {
40 var model = new SendModel(); 40 var model = new SendModel();
mqtt-core/src/main/java/com/diligrp/mqtt/core/service/MqttTopicService.java
1 package com.diligrp.mqtt.core.service; 1 package com.diligrp.mqtt.core.service;
2 2
  3 +import com.diligrp.mqtt.core.model.SubscribeModel;
  4 +
3 import java.util.HashSet; 5 import java.util.HashSet;
4 6
5 public interface MqttTopicService { 7 public interface MqttTopicService {
@@ -13,6 +15,13 @@ public interface MqttTopicService { @@ -13,6 +15,13 @@ public interface MqttTopicService {
13 void subscribeTopic(String topic, int qos); 15 void subscribeTopic(String topic, int qos);
14 16
15 /** 17 /**
  18 + * 订阅主题
  19 + *
  20 + * @param subscribeModel 订阅模式
  21 + */
  22 + void subscribeTopic(SubscribeModel subscribeModel);
  23 +
  24 + /**
16 * 取消订阅主题 25 * 取消订阅主题
17 * 26 *
18 * @param topic 主题 27 * @param topic 主题
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/CharsetType.java 0 → 100644
  1 +package com.diligrp.mqtt.core.type;
  2 +
  3 +import java.nio.charset.Charset;
  4 +import java.nio.charset.StandardCharsets;
  5 +
  6 +/**
  7 + * @Author: zhangmeiyang
  8 + * @CreateTime: 2026-01-14 17:34
  9 + * @Version: todo
  10 + */
  11 +public enum CharsetType {
  12 +
  13 + UTF_8("UTF-8") {
  14 + @Override
  15 + public byte[] getBytes(String str) {
  16 + return str.getBytes(StandardCharsets.UTF_8);
  17 + }
  18 + },
  19 + GBK("GBK") {
  20 + @Override
  21 + public byte[] getBytes(String str) {
  22 + return str.getBytes(Charset.forName("GBK"));
  23 + }
  24 + };
  25 + public final String code;
  26 +
  27 + CharsetType(String code) {
  28 + this.code = code;
  29 + }
  30 +
  31 + public abstract byte[] getBytes(String str);
  32 +
  33 + public static CharsetType of(String charset) {
  34 + for (CharsetType value : values()) {
  35 + if (value.name().equals(charset)) {
  36 + return value;
  37 + }
  38 + }
  39 + throw new IllegalArgumentException("不支持的字符集类型:" + charset);
  40 + }
  41 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/SendFailEventType.java renamed to mqtt-core/src/main/java/com/diligrp/mqtt/core/type/SendEventType.java
1 package com.diligrp.mqtt.core.type; 1 package com.diligrp.mqtt.core.type;
2 2
3 -public enum SendFailEventType { 3 +public enum SendEventType {
4 4
5 PRINTER("printer"); 5 PRINTER("printer");
6 public final String value; 6 public final String value;
7 7
8 - SendFailEventType(String value) { 8 + SendEventType(String value) {
9 this.value = value; 9 this.value = value;
10 } 10 }
  11 +
  12 + public static SendEventType of(String value) {
  13 + for (SendEventType type : values()) {
  14 + if (type.value.equals(value)) {
  15 + return type;
  16 + }
  17 + }
  18 + throw new IllegalArgumentException("No matching constant for [" + value + "]");
  19 + }
11 } 20 }
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/SendSuccessEventType.java deleted 100644 → 0
1 -package com.diligrp.mqtt.core.type;  
2 -  
3 -public enum SendSuccessEventType {  
4 -  
5 - PRINTER("printer");  
6 - public final String value;  
7 -  
8 - SendSuccessEventType(String value) {  
9 - this.value = value;  
10 - }  
11 -}  
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/handler/MqttMessageReceiver.java
@@ -3,6 +3,7 @@ package com.diligrp.mqtt.integration.handler; @@ -3,6 +3,7 @@ package com.diligrp.mqtt.integration.handler;
3 import com.diligrp.mqtt.core.event.RecFailEvent; 3 import com.diligrp.mqtt.core.event.RecFailEvent;
4 import com.diligrp.mqtt.core.event.RecSuccessEvent; 4 import com.diligrp.mqtt.core.event.RecSuccessEvent;
5 import com.diligrp.mqtt.core.model.ReceiveModel; 5 import com.diligrp.mqtt.core.model.ReceiveModel;
  6 +import com.diligrp.mqtt.core.util.JsonUtils;
6 import jakarta.annotation.Resource; 7 import jakarta.annotation.Resource;
7 import lombok.extern.slf4j.Slf4j; 8 import lombok.extern.slf4j.Slf4j;
8 import org.springframework.context.ApplicationEventPublisher; 9 import org.springframework.context.ApplicationEventPublisher;
@@ -37,6 +38,7 @@ public class MqttMessageReceiver { @@ -37,6 +38,7 @@ public class MqttMessageReceiver {
37 */ 38 */
38 @ServiceActivator(inputChannel = "mqttInboundChannel") 39 @ServiceActivator(inputChannel = "mqttInboundChannel")
39 public void handleMqttMessage(Message<?> message) { 40 public void handleMqttMessage(Message<?> message) {
  41 + log.info("收到MQTT消息 - :{}", JsonUtils.toJsonString(message));
40 try { 42 try {
41 String topic = message.getHeaders().get(TOPIC_HEADER, String.class); 43 String topic = message.getHeaders().get(TOPIC_HEADER, String.class);
42 Integer qos = message.getHeaders().get(QOS_HEADER, Integer.class); 44 Integer qos = message.getHeaders().get(QOS_HEADER, Integer.class);
@@ -45,8 +47,6 @@ public class MqttMessageReceiver { @@ -45,8 +47,6 @@ public class MqttMessageReceiver {
45 Long timestamp = message.getHeaders().get(TIMESTAMP_HEADER, Long.class); 47 Long timestamp = message.getHeaders().get(TIMESTAMP_HEADER, Long.class);
46 Integer mqttId = message.getHeaders().get(MQTT_ID_HEADER, Integer.class); 48 Integer mqttId = message.getHeaders().get(MQTT_ID_HEADER, Integer.class);
47 Boolean duplicate = message.getHeaders().get(MQTT_DUPLICATE_HEADER, Boolean.class); 49 Boolean duplicate = message.getHeaders().get(MQTT_DUPLICATE_HEADER, Boolean.class);
48 - String successEvent = message.getHeaders().get(SUCCESS_EVENT_HEADER, String.class);  
49 - String failEvent = message.getHeaders().get(FAIL_EVENT_HEADER, String.class);  
50 String payload = message.getPayload().toString(); 50 String payload = message.getPayload().toString();
51 var receiveModel = new ReceiveModel(); 51 var receiveModel = new ReceiveModel();
52 receiveModel.setId(id); 52 receiveModel.setId(id);
@@ -57,9 +57,6 @@ public class MqttMessageReceiver { @@ -57,9 +57,6 @@ public class MqttMessageReceiver {
57 receiveModel.setDuplicate(duplicate); 57 receiveModel.setDuplicate(duplicate);
58 receiveModel.setPayload(payload); 58 receiveModel.setPayload(payload);
59 receiveModel.setTimestamp(timestamp); 59 receiveModel.setTimestamp(timestamp);
60 - receiveModel.setSuccessActionEvent(successEvent);  
61 - receiveModel.setFailActionEvent(failEvent);  
62 - log.info("收到MQTT消息 - :{}", receiveModel);  
63 applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel)); 60 applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel));
64 } catch (Exception e) { 61 } catch (Exception e) {
65 log.error("处理MQTT消息失败", e); 62 log.error("处理MQTT消息失败", e);
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/handler/MqttMessageSender.java
@@ -2,8 +2,10 @@ package com.diligrp.mqtt.integration.handler; @@ -2,8 +2,10 @@ package com.diligrp.mqtt.integration.handler;
2 2
3 import com.diligrp.mqtt.core.event.SendFailEvent; 3 import com.diligrp.mqtt.core.event.SendFailEvent;
4 import com.diligrp.mqtt.core.event.SendSuccessEvent; 4 import com.diligrp.mqtt.core.event.SendSuccessEvent;
  5 +import com.diligrp.mqtt.core.exception.MqttServiceException;
5 import com.diligrp.mqtt.core.model.SendModel; 6 import com.diligrp.mqtt.core.model.SendModel;
6 import com.diligrp.mqtt.core.service.MqttMessageService; 7 import com.diligrp.mqtt.core.service.MqttMessageService;
  8 +import com.diligrp.mqtt.core.type.CharsetType;
7 import com.diligrp.mqtt.core.util.JsonUtils; 9 import com.diligrp.mqtt.core.util.JsonUtils;
8 import jakarta.annotation.Resource; 10 import jakarta.annotation.Resource;
9 import lombok.extern.slf4j.Slf4j; 11 import lombok.extern.slf4j.Slf4j;
@@ -34,9 +36,17 @@ public class MqttMessageSender implements MqttMessageService { @@ -34,9 +36,17 @@ public class MqttMessageSender implements MqttMessageService {
34 36
35 @Override 37 @Override
36 public void sendMessage(SendModel sendModel) { 38 public void sendMessage(SendModel sendModel) {
  39 + Optional.ofNullable(sendModel).orElseThrow(()->new MqttServiceException("消息结构体不能为空"));
  40 + Optional.ofNullable(sendModel.getTopic()).filter(e -> !e.isEmpty()).orElseThrow(()->new MqttServiceException("【消息主题】不能为空"));
  41 + Optional.ofNullable(sendModel.getPayload()).orElseThrow(()->new MqttServiceException("【消息内容】不能为空"));
  42 + Optional.ofNullable(sendModel.getQos()).filter(e -> e >= 0 && e <= 2).orElseThrow(()->new MqttServiceException("【QOS编码】错误"));
  43 + Optional.ofNullable(sendModel.getRetained()).orElseThrow(()->new MqttServiceException("【消息是否保留】不能为空"));
  44 + Optional.ofNullable(sendModel.getCharsetType()).orElseThrow(()->new MqttServiceException("【发送消息字符集】不能为空"));
  45 + Optional.ofNullable(sendModel.getSendEventType()).orElseThrow(()->new MqttServiceException("【发送事件标识】不能为空"));
37 try { 46 try {
38 String payload = JsonUtils.toJsonString(sendModel.getPayload()); 47 String payload = JsonUtils.toJsonString(sendModel.getPayload());
39 - var message = MessageBuilder.withPayload(payload) 48 + byte[] bytes = CharsetType.of(sendModel.getCharsetType()).getBytes(payload);
  49 + var message = MessageBuilder.withPayload(bytes)
40 .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) 50 .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
41 .setHeader(MQTT_TOPIC, sendModel.getTopic()) 51 .setHeader(MQTT_TOPIC, sendModel.getTopic())
42 .setHeader(MQTT_QOS, sendModel.getQos()) 52 .setHeader(MQTT_QOS, sendModel.getQos())
@@ -44,10 +54,10 @@ public class MqttMessageSender implements MqttMessageService { @@ -44,10 +54,10 @@ public class MqttMessageSender implements MqttMessageService {
44 .build(); 54 .build();
45 boolean result = mqttOutboundChannel.send(message); 55 boolean result = mqttOutboundChannel.send(message);
46 Optional.of(result).filter(e -> e).ifPresentOrElse(send -> { 56 Optional.of(result).filter(e -> e).ifPresentOrElse(send -> {
47 - log.info("消息发送成功 - :{}", message); 57 + log.info("消息发送成功 - :{}", JsonUtils.toJsonString(message));
48 applicationEventPublisher.publishEvent(new SendSuccessEvent(this, sendModel)); 58 applicationEventPublisher.publishEvent(new SendSuccessEvent(this, sendModel));
49 }, () -> { 59 }, () -> {
50 - log.error("消息发送失败 - :{}", message); 60 + log.error("消息发送失败 - :{}", JsonUtils.toJsonString(message));
51 applicationEventPublisher.publishEvent(new SendFailEvent(this, sendModel)); 61 applicationEventPublisher.publishEvent(new SendFailEvent(this, sendModel));
52 }); 62 });
53 } catch (Exception e) { 63 } catch (Exception e) {
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/manager/MqttTopicManager.java
1 package com.diligrp.mqtt.integration.manager; 1 package com.diligrp.mqtt.integration.manager;
2 2
  3 +import com.diligrp.mqtt.core.model.SubscribeModel;
3 import com.diligrp.mqtt.core.service.MqttTopicService; 4 import com.diligrp.mqtt.core.service.MqttTopicService;
4 import jakarta.annotation.Resource; 5 import jakarta.annotation.Resource;
5 import lombok.extern.slf4j.Slf4j; 6 import lombok.extern.slf4j.Slf4j;
@@ -21,6 +22,17 @@ public class MqttTopicManager implements MqttTopicService { @@ -21,6 +22,17 @@ public class MqttTopicManager implements MqttTopicService {
21 @Resource 22 @Resource
22 private MqttPahoMessageDrivenChannelAdapter mqttMessageDrivenChannelAdapter; 23 private MqttPahoMessageDrivenChannelAdapter mqttMessageDrivenChannelAdapter;
23 24
  25 +
  26 + /**
  27 + * 订阅主题
  28 + *
  29 + * @param subscribeModel 订阅模式
  30 + */
  31 + @Override
  32 + public void subscribeTopic(SubscribeModel subscribeModel) {
  33 + subscribeTopic(subscribeModel.getTopic(), subscribeModel.getQos());
  34 + }
  35 +
24 /** 36 /**
25 * 动态订阅Topic 37 * 动态订阅Topic
26 * 38 *