Commit 59358ca8dfdcb3f080f66ff70d82ca2e7eb99b12
1 parent
2e676e7c
```
feat(mqtt): 添加接收事件类型枚举并重构消息处理逻辑 - 引入ReceiveEventType枚举替代原有的RecFailEventType和RecSuccessEventType - 在ReceiveModel中添加receiveEventType字段用于标识事件类型 - 实现matchEventType方法根据MQTT主题自动匹配事件类型 - 移除MqttMessageReceiver中的冗余常量定义 - 重命名RecFailEventType为ReceiveEventType并扩展支持多种服务类型 ```
Showing
5 changed files
with
39 additions
and
27 deletions
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/ReceiveModel.java
| 1 | package com.diligrp.mqtt.core.model; | 1 | package com.diligrp.mqtt.core.model; |
| 2 | 2 | ||
| 3 | +import com.diligrp.mqtt.core.type.ReceiveEventType; | ||
| 3 | import lombok.Data; | 4 | import lombok.Data; |
| 4 | 5 | ||
| 5 | import java.util.UUID; | 6 | import java.util.UUID; |
| @@ -43,4 +44,8 @@ public class ReceiveModel { | @@ -43,4 +44,8 @@ public class ReceiveModel { | ||
| 43 | * 时间戳 | 44 | * 时间戳 |
| 44 | */ | 45 | */ |
| 45 | private Long timestamp; | 46 | private Long timestamp; |
| 47 | + /** | ||
| 48 | + * 接收事件类型 | ||
| 49 | + */ | ||
| 50 | + private ReceiveEventType receiveEventType; | ||
| 46 | } | 51 | } |
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/RecFailEventType.java deleted
100644 → 0
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/RecSuccessEventType.java deleted
100644 → 0
| 1 | -package com.diligrp.mqtt.core.type; | ||
| 2 | - | ||
| 3 | -/** | ||
| 4 | - * @Author: zhangmeiyang | ||
| 5 | - * @CreateTime: 2026-01-14 16:59 | ||
| 6 | - * @Version: todo | ||
| 7 | - */ | ||
| 8 | -public enum RecSuccessEventType { | ||
| 9 | - PRINTER("printer"); | ||
| 10 | - public final String value; | ||
| 11 | - | ||
| 12 | - RecSuccessEventType(String value) { | ||
| 13 | - this.value = value; | ||
| 14 | - } | ||
| 15 | -} |
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/ReceiveEventType.java
0 → 100644
| 1 | +package com.diligrp.mqtt.core.type; | ||
| 2 | + | ||
| 3 | +import java.util.Objects; | ||
| 4 | + | ||
| 5 | +public enum ReceiveEventType { | ||
| 6 | + PRINTER_RECEIPT_SERVICE("/printSucc/"), | ||
| 7 | + PRINTER_ONLINE_SERVICE("/priGoOnline/"), | ||
| 8 | + UNKNOWN_SERVICE("unknownService"); | ||
| 9 | + public final String value; | ||
| 10 | + | ||
| 11 | + ReceiveEventType(String value) { | ||
| 12 | + this.value = value; | ||
| 13 | + } | ||
| 14 | + | ||
| 15 | + /** | ||
| 16 | + * 根据MQTT主题匹配事件类型 | ||
| 17 | + * | ||
| 18 | + * @param topic MQTT主题 | ||
| 19 | + * @return 匹配的事件类型,未匹配成功则返回null | ||
| 20 | + */ | ||
| 21 | + public static ReceiveEventType matchEventType(String topic) { | ||
| 22 | + if (Objects.isNull(topic)) { | ||
| 23 | + return UNKNOWN_SERVICE; | ||
| 24 | + } | ||
| 25 | + for (ReceiveEventType eventType : ReceiveEventType.values()) { | ||
| 26 | + if (topic.startsWith(eventType.value)) { | ||
| 27 | + return eventType; | ||
| 28 | + } | ||
| 29 | + } | ||
| 30 | + return UNKNOWN_SERVICE; | ||
| 31 | + } | ||
| 32 | +} |
mqtt-integration/src/main/java/com/diligrp/mqtt/integration/handler/MqttMessageReceiver.java
| @@ -3,6 +3,7 @@ package com.diligrp.mqtt.integration.handler; | @@ -3,6 +3,7 @@ package com.diligrp.mqtt.integration.handler; | ||
| 3 | import com.diligrp.mqtt.core.event.RecFailEvent; | 3 | import com.diligrp.mqtt.core.event.RecFailEvent; |
| 4 | import com.diligrp.mqtt.core.event.RecSuccessEvent; | 4 | import com.diligrp.mqtt.core.event.RecSuccessEvent; |
| 5 | import com.diligrp.mqtt.core.model.ReceiveModel; | 5 | import com.diligrp.mqtt.core.model.ReceiveModel; |
| 6 | +import com.diligrp.mqtt.core.type.ReceiveEventType; | ||
| 6 | import com.diligrp.mqtt.core.util.JsonUtils; | 7 | import com.diligrp.mqtt.core.util.JsonUtils; |
| 7 | import jakarta.annotation.Resource; | 8 | import jakarta.annotation.Resource; |
| 8 | import lombok.extern.slf4j.Slf4j; | 9 | import lombok.extern.slf4j.Slf4j; |
| @@ -27,8 +28,6 @@ public class MqttMessageReceiver { | @@ -27,8 +28,6 @@ public class MqttMessageReceiver { | ||
| 27 | private static final String TOPIC_HEADER = "mqtt_receivedTopic"; | 28 | private static final String TOPIC_HEADER = "mqtt_receivedTopic"; |
| 28 | private static final String QOS_HEADER = "mqtt_receivedQos"; | 29 | private static final String QOS_HEADER = "mqtt_receivedQos"; |
| 29 | private static final String TIMESTAMP_HEADER = "timestamp"; | 30 | private static final String TIMESTAMP_HEADER = "timestamp"; |
| 30 | - private static final String SUCCESS_EVENT_HEADER = "mqtt_success_event"; | ||
| 31 | - private static final String FAIL_EVENT_HEADER = "mqtt_fail_event"; | ||
| 32 | 31 | ||
| 33 | @Resource | 32 | @Resource |
| 34 | private ApplicationEventPublisher applicationEventPublisher; | 33 | private ApplicationEventPublisher applicationEventPublisher; |
| @@ -57,6 +56,7 @@ public class MqttMessageReceiver { | @@ -57,6 +56,7 @@ public class MqttMessageReceiver { | ||
| 57 | receiveModel.setDuplicate(duplicate); | 56 | receiveModel.setDuplicate(duplicate); |
| 58 | receiveModel.setPayload(payload); | 57 | receiveModel.setPayload(payload); |
| 59 | receiveModel.setTimestamp(timestamp); | 58 | receiveModel.setTimestamp(timestamp); |
| 59 | + receiveModel.setReceiveEventType(ReceiveEventType.matchEventType(topic)); | ||
| 60 | applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel)); | 60 | applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel)); |
| 61 | } catch (Exception e) { | 61 | } catch (Exception e) { |
| 62 | log.error("处理MQTT消息失败", e); | 62 | log.error("处理MQTT消息失败", e); |