Commit be4fa92b51a019eefbe446bcf775fa3534a098d4

Authored by zhangmeiyang
1 parent df9beb48

```

feat(mqtt): 新增MQTT消息收发后置处理器功能

- 新增MqttReceiveContext和MqttSendContext用于管理接收和发送后置处理器
- 实现消息发送成功/失败和接收成功/失败的事件处理逻辑
- 添加PostProcessorService和ReceiveProcessorService接口定义
- 创建默认未知接收处理器DefaultUnknownReceiveProcessor
- 实现打印机相关的处理器包括PrinterPostProcessor、
  PrinterOnlineReceiveProcessor和PrinterSuccessReceiveProcessor
- 在MqttListener中集成新的后置处理机制
```
mqtt-core/src/main/java/com/diligrp/mqtt/core/context/MqttReceiveContext.java 0 → 100644
  1 +package com.diligrp.mqtt.core.context;
  2 +
  3 +import com.diligrp.mqtt.core.service.ReceiveProcessorService;
  4 +import com.diligrp.mqtt.core.type.ReceiveEventType;
  5 +import jakarta.annotation.Resource;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.beans.factory.DisposableBean;
  8 +import org.springframework.beans.factory.InitializingBean;
  9 +import org.springframework.stereotype.Component;
  10 +
  11 +import java.util.List;
  12 +import java.util.concurrent.ConcurrentHashMap;
  13 +import java.util.stream.Collectors;
  14 +
  15 +/**
  16 + * @Author: zhangmeiyang
  17 + * @CreateTime: 2026-01-15 16:00
  18 + * @Version: todo
  19 + */
  20 +@Component
  21 +@Slf4j
  22 +public class MqttReceiveContext implements InitializingBean, DisposableBean {
  23 +
  24 + @Resource
  25 + private List<ReceiveProcessorService> receiveProcessorServices;
  26 +
  27 + public static final ConcurrentHashMap<ReceiveEventType, ReceiveProcessorService> CONTEXT = new ConcurrentHashMap<>();
  28 +
  29 +
  30 + @Override
  31 + public void destroy() throws Exception {
  32 +
  33 + }
  34 +
  35 + @Override
  36 + public void afterPropertiesSet() throws Exception {
  37 + CONTEXT.putAll(receiveProcessorServices.stream().collect(Collectors.toMap(ReceiveProcessorService::receiveEventType, service -> service)));
  38 + log.info("消息接收后置处理器加载完成...");
  39 + }
  40 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/context/MqttSendContext.java 0 → 100644
  1 +package com.diligrp.mqtt.core.context;
  2 +
  3 +import com.diligrp.mqtt.core.service.PostProcessorService;
  4 +import com.diligrp.mqtt.core.type.SendEventType;
  5 +import jakarta.annotation.Resource;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.beans.factory.DisposableBean;
  8 +import org.springframework.beans.factory.InitializingBean;
  9 +import org.springframework.stereotype.Component;
  10 +
  11 +import java.util.List;
  12 +import java.util.concurrent.ConcurrentHashMap;
  13 +import java.util.stream.Collectors;
  14 +
  15 +/**
  16 + * @Author: zhangmeiyang
  17 + * @CreateTime: 2026-01-15 15:54
  18 + * @Version: todo
  19 + */
  20 +@Component
  21 +@Slf4j
  22 +public class MqttSendContext implements InitializingBean, DisposableBean {
  23 +
  24 + @Resource
  25 + private List<PostProcessorService> postProcessorServices;
  26 +
  27 + public static final ConcurrentHashMap<SendEventType, PostProcessorService> CONTEXT = new ConcurrentHashMap<>();
  28 +
  29 +
  30 + @Override
  31 + public void destroy() throws Exception {
  32 +
  33 + }
  34 +
  35 + @Override
  36 + public void afterPropertiesSet() throws Exception {
  37 + CONTEXT.putAll(postProcessorServices.stream().collect(Collectors.toMap(PostProcessorService::sendEventType, service -> service)));
  38 + log.info("消息发送后置处理器加载完成...");
  39 + }
  40 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/event/listener/MqttListener.java
1 package com.diligrp.mqtt.core.event.listener; 1 package com.diligrp.mqtt.core.event.listener;
2 2
  3 +import com.diligrp.mqtt.core.context.MqttReceiveContext;
  4 +import com.diligrp.mqtt.core.context.MqttSendContext;
3 import com.diligrp.mqtt.core.event.RecFailEvent; 5 import com.diligrp.mqtt.core.event.RecFailEvent;
4 import com.diligrp.mqtt.core.event.RecSuccessEvent; 6 import com.diligrp.mqtt.core.event.RecSuccessEvent;
5 import com.diligrp.mqtt.core.event.SendFailEvent; 7 import com.diligrp.mqtt.core.event.SendFailEvent;
6 import com.diligrp.mqtt.core.event.SendSuccessEvent; 8 import com.diligrp.mqtt.core.event.SendSuccessEvent;
  9 +import com.diligrp.mqtt.core.type.ReceiveEventType;
  10 +import com.diligrp.mqtt.core.type.SendEventType;
7 import lombok.extern.slf4j.Slf4j; 11 import lombok.extern.slf4j.Slf4j;
8 import org.springframework.context.event.EventListener; 12 import org.springframework.context.event.EventListener;
9 import org.springframework.stereotype.Component; 13 import org.springframework.stereotype.Component;
@@ -19,21 +23,27 @@ public class MqttListener { @@ -19,21 +23,27 @@ public class MqttListener {
19 23
20 @EventListener 24 @EventListener
21 public void onSendSuccess(SendSuccessEvent event) { 25 public void onSendSuccess(SendSuccessEvent event) {
22 - log.info("消息发送成功事件完成后置处理,{}", event.getSendModel()); 26 + String sendEventType = event.getSendModel().getSendEventType();
  27 + MqttSendContext.CONTEXT.get(SendEventType.of(sendEventType)).successProcess(event.getSendModel());
  28 + log.info("消息发送成功事件完成后置处理,{},event:{}", event.getSendModel(), event.getSendModel().getSendEventType());
23 } 29 }
24 30
25 @EventListener 31 @EventListener
26 public void onSendFail(SendFailEvent event) { 32 public void onSendFail(SendFailEvent event) {
27 - log.info("消息发送失败事件完成后置处理,{}", event.getSendModel()); 33 + String sendEventType = event.getSendModel().getSendEventType();
  34 + MqttSendContext.CONTEXT.get(SendEventType.of(sendEventType)).failProcess(event.getSendModel());
  35 + log.info("消息发送失败事件完成后置处理,{},event:{}", event.getSendModel(), event.getSendModel().getSendEventType());
28 } 36 }
29 37
30 @EventListener 38 @EventListener
31 public void onRecSuccess(RecSuccessEvent event) { 39 public void onRecSuccess(RecSuccessEvent event) {
32 - log.info("消息接收成功事件完成后置处理,{}", event.getReceiveModel()); 40 + ReceiveEventType receiveEventType = event.getReceiveModel().getReceiveEventType();
  41 + MqttReceiveContext.CONTEXT.get(receiveEventType).successReceive(event.getReceiveModel());
  42 + log.info("消息接收成功事件完成后置处理,{},event: {}", event.getReceiveModel(), receiveEventType);
33 } 43 }
34 44
35 @EventListener 45 @EventListener
36 public void onRecFail(RecFailEvent event) { 46 public void onRecFail(RecFailEvent event) {
37 - log.info("消息接收失败事件完成后置处理,{}",event.getMessage()); 47 + log.info("消息接收失败事件完成后置处理,{}", event.getMessage());
38 } 48 }
39 } 49 }
mqtt-core/src/main/java/com/diligrp/mqtt/core/service/PostProcessorService.java 0 → 100644
  1 +package com.diligrp.mqtt.core.service;
  2 +
  3 +import com.diligrp.mqtt.core.model.SendModel;
  4 +import com.diligrp.mqtt.core.type.SendEventType;
  5 +
  6 +public interface PostProcessorService {
  7 +
  8 + /**
  9 + * 发送事件类型
  10 + *
  11 + * @return {@link SendEventType }
  12 + */
  13 + SendEventType sendEventType();
  14 +
  15 +
  16 + /**
  17 + * 成功流程
  18 + *
  19 + * @param sendModel 发送模型
  20 + */
  21 + void successProcess(SendModel sendModel);
  22 +
  23 +
  24 + /**
  25 + * 失败流程
  26 + *
  27 + * @param sendModel 模型
  28 + */
  29 + void failProcess(SendModel sendModel);
  30 +}
mqtt-core/src/main/java/com/diligrp/mqtt/core/service/ReceiveProcessorService.java 0 → 100644
  1 +package com.diligrp.mqtt.core.service;
  2 +
  3 +import com.diligrp.mqtt.core.model.ReceiveModel;
  4 +import com.diligrp.mqtt.core.type.ReceiveEventType;
  5 +import com.diligrp.mqtt.core.type.SendEventType;
  6 +
  7 +public interface ReceiveProcessorService {
  8 + /**
  9 + * 发送事件类型
  10 + *
  11 + * @return {@link SendEventType }
  12 + */
  13 + ReceiveEventType receiveEventType();
  14 +
  15 +
  16 + /**
  17 + * 成功流程
  18 + *
  19 + * @param receiveModel 接收模型
  20 + */
  21 + void successReceive(ReceiveModel receiveModel);
  22 +}
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/DefaultUnknownReceiveProcessor.java 0 → 100644
  1 +package com.diligrp.mqtt.web.processor;
  2 +
  3 +import com.diligrp.mqtt.core.model.ReceiveModel;
  4 +import com.diligrp.mqtt.core.service.ReceiveProcessorService;
  5 +import com.diligrp.mqtt.core.type.ReceiveEventType;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +/**
  9 + * 默认未知接收处理器
  10 + *
  11 + * @author zhangmeiyang
  12 + * @date 2026/01/15
  13 + */
  14 +@Component
  15 +public class DefaultUnknownReceiveProcessor implements ReceiveProcessorService {
  16 + @Override
  17 + public ReceiveEventType receiveEventType() {
  18 + return ReceiveEventType.UNKNOWN_SERVICE;
  19 + }
  20 +
  21 + @Override
  22 + public void successReceive(ReceiveModel receiveModel) {
  23 +
  24 + }
  25 +}
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/printer/PrinterOnlineReceiveProcessor.java 0 → 100644
  1 +package com.diligrp.mqtt.web.processor.printer;
  2 +
  3 +import com.diligrp.mqtt.core.model.ReceiveModel;
  4 +import com.diligrp.mqtt.core.service.ReceiveProcessorService;
  5 +import com.diligrp.mqtt.core.type.ReceiveEventType;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +/**
  9 + * @Author: zhangmeiyang
  10 + * @CreateTime: 2026-01-15 16:10
  11 + * @Version: todo
  12 + */
  13 +@Component
  14 +public class PrinterOnlineReceiveProcessor implements ReceiveProcessorService {
  15 + @Override
  16 + public ReceiveEventType receiveEventType() {
  17 + return ReceiveEventType.PRINTER_ONLINE_SERVICE;
  18 + }
  19 +
  20 + @Override
  21 + public void successReceive(ReceiveModel receiveModel) {
  22 +
  23 + }
  24 +}
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/printer/PrinterPostProcessor.java 0 → 100644
  1 +package com.diligrp.mqtt.web.processor.printer;
  2 +
  3 +import com.diligrp.mqtt.core.model.SendModel;
  4 +import com.diligrp.mqtt.core.service.PostProcessorService;
  5 +import com.diligrp.mqtt.core.type.SendEventType;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +/**
  9 + * @Author: zhangmeiyang
  10 + * @CreateTime: 2026-01-15 16:08
  11 + * @Version: todo
  12 + */
  13 +@Component
  14 +public class PrinterPostProcessor implements PostProcessorService {
  15 + @Override
  16 + public SendEventType sendEventType() {
  17 + return SendEventType.PRINTER;
  18 + }
  19 +
  20 + @Override
  21 + public void successProcess(SendModel sendModel) {
  22 +
  23 + }
  24 +
  25 + @Override
  26 + public void failProcess(SendModel sendModel) {
  27 +
  28 + }
  29 +}
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/printer/PrinterSuccessReceiveProcessor.java 0 → 100644
  1 +package com.diligrp.mqtt.web.processor.printer;
  2 +
  3 +import com.diligrp.mqtt.core.model.ReceiveModel;
  4 +import com.diligrp.mqtt.core.service.ReceiveProcessorService;
  5 +import com.diligrp.mqtt.core.type.ReceiveEventType;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +/**
  9 + * @Author: zhangmeiyang
  10 + * @CreateTime: 2026-01-15 16:09
  11 + * @Version: todo
  12 + */
  13 +@Component
  14 +public class PrinterSuccessReceiveProcessor implements ReceiveProcessorService {
  15 + @Override
  16 + public ReceiveEventType receiveEventType() {
  17 + return ReceiveEventType.PRINTER_RECEIPT_SERVICE;
  18 + }
  19 +
  20 + @Override
  21 + public void successReceive(ReceiveModel receiveModel) {
  22 +
  23 + }
  24 +}