Commit ec254421e3d216a588c148734cd32f0d182d7c89
1 parent
be4fa92b
feat(mqtt): 新增设备状态管理功能并优化消息处理流程
Showing
10 changed files
with
189 additions
and
12 deletions
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/DeviceStatusEnum.java
0 → 100644
| 1 | +package com.diligrp.mqtt.core.type; | |
| 2 | + | |
| 3 | +import lombok.Getter; | |
| 4 | + | |
| 5 | +/** | |
| 6 | + * @author lvqi | |
| 7 | + */ | |
| 8 | +@Getter | |
| 9 | +public enum DeviceStatusEnum { | |
| 10 | + ONLINE(1, "在线"), | |
| 11 | + OFFLINE(2, "离线"); | |
| 12 | + | |
| 13 | + | |
| 14 | + private final Integer code; | |
| 15 | + private final String message; | |
| 16 | + | |
| 17 | + | |
| 18 | + DeviceStatusEnum(Integer code, String message) { | |
| 19 | + this.code = code; | |
| 20 | + this.message = message; | |
| 21 | + | |
| 22 | + } | |
| 23 | +} | ... | ... |
mqtt-core/src/main/java/com/diligrp/mqtt/core/type/TopicType.java
mqtt-web/src/main/java/com/diligrp/mqtt/web/model/DeviceStatusInfoRecord.java
0 → 100644
| 1 | +package com.diligrp.mqtt.web.model; | |
| 2 | + | |
| 3 | +import lombok.Data; | |
| 4 | +import org.springframework.data.annotation.Id; | |
| 5 | +import org.springframework.data.mongodb.core.mapping.Document; | |
| 6 | + | |
| 7 | +import java.time.LocalDateTime; | |
| 8 | + | |
| 9 | +/** | |
| 10 | + * 设备状态记录 | |
| 11 | + * | |
| 12 | + * @author lvqi | |
| 13 | + */ | |
| 14 | +@Document("device_status_info_record") | |
| 15 | +@Data | |
| 16 | +public class DeviceStatusInfoRecord { | |
| 17 | + @Id | |
| 18 | + private String id; | |
| 19 | + | |
| 20 | + private String firm; | |
| 21 | + private String productId; | |
| 22 | + | |
| 23 | + private String sn; | |
| 24 | + | |
| 25 | + private Integer status; | |
| 26 | + | |
| 27 | + private LocalDateTime createTime; | |
| 28 | +} | ... | ... |
mqtt-web/src/main/java/com/diligrp/mqtt/web/model/PrintDevice.java
mqtt-core/src/main/java/com/diligrp/mqtt/core/model/ReceiveRelyRecord.java renamed to mqtt-web/src/main/java/com/diligrp/mqtt/web/model/ReceiveRelyRecord.java
| 1 | -package com.diligrp.mqtt.core.model; | |
| 1 | +package com.diligrp.mqtt.web.model; | |
| 2 | 2 | |
| 3 | 3 | import lombok.Data; |
| 4 | 4 | import org.springframework.data.annotation.Id; |
| 5 | 5 | import org.springframework.data.mongodb.core.mapping.Document; |
| 6 | 6 | |
| 7 | +import java.time.LocalDateTime; | |
| 8 | + | |
| 7 | 9 | |
| 8 | 10 | /** |
| 9 | 11 | * 打印信息记录 |
| ... | ... | @@ -31,9 +33,15 @@ public class ReceiveRelyRecord { |
| 31 | 33 | */ |
| 32 | 34 | private Integer messStatus; |
| 33 | 35 | |
| 36 | + private LocalDateTime createTime; | |
| 37 | + | |
| 34 | 38 | /** |
| 35 | 39 | * 客户端回执状态 |
| 36 | 40 | */ |
| 37 | 41 | private String replyStatus; |
| 38 | 42 | |
| 43 | + private LocalDateTime replyTime; | |
| 44 | + | |
| 45 | + private Integer retries; | |
| 46 | + | |
| 39 | 47 | } | ... | ... |
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/printer/PrinterOnlineReceiveProcessor.java
| ... | ... | @@ -2,9 +2,20 @@ package com.diligrp.mqtt.web.processor.printer; |
| 2 | 2 | |
| 3 | 3 | import com.diligrp.mqtt.core.model.ReceiveModel; |
| 4 | 4 | import com.diligrp.mqtt.core.service.ReceiveProcessorService; |
| 5 | +import com.diligrp.mqtt.core.type.DeviceStatusEnum; | |
| 6 | +import com.diligrp.mqtt.core.type.FirmEnum; | |
| 5 | 7 | import com.diligrp.mqtt.core.type.ReceiveEventType; |
| 8 | +import com.diligrp.mqtt.core.type.TopicType; | |
| 9 | +import com.diligrp.mqtt.core.util.JsonUtils; | |
| 10 | +import com.diligrp.mqtt.web.model.DeviceStatusInfoRecord; | |
| 11 | +import com.fasterxml.jackson.core.type.TypeReference; | |
| 12 | +import jakarta.annotation.Resource; | |
| 13 | +import org.springframework.data.mongodb.core.MongoTemplate; | |
| 6 | 14 | import org.springframework.stereotype.Component; |
| 7 | 15 | |
| 16 | +import java.time.LocalDateTime; | |
| 17 | +import java.util.Map; | |
| 18 | + | |
| 8 | 19 | /** |
| 9 | 20 | * @Author: zhangmeiyang |
| 10 | 21 | * @CreateTime: 2026-01-15 16:10 |
| ... | ... | @@ -12,6 +23,9 @@ import org.springframework.stereotype.Component; |
| 12 | 23 | */ |
| 13 | 24 | @Component |
| 14 | 25 | public class PrinterOnlineReceiveProcessor implements ReceiveProcessorService { |
| 26 | + @Resource | |
| 27 | + private MongoTemplate mongoTemplate; | |
| 28 | + | |
| 15 | 29 | @Override |
| 16 | 30 | public ReceiveEventType receiveEventType() { |
| 17 | 31 | return ReceiveEventType.PRINTER_ONLINE_SERVICE; |
| ... | ... | @@ -19,6 +33,24 @@ public class PrinterOnlineReceiveProcessor implements ReceiveProcessorService { |
| 19 | 33 | |
| 20 | 34 | @Override |
| 21 | 35 | public void successReceive(ReceiveModel receiveModel) { |
| 36 | + String topic = receiveModel.getTopic(); | |
| 37 | + if (topic.contains(TopicType.PRINTER_ONLINE_SERVICE.getValue())) { | |
| 38 | + String payload = receiveModel.getPayload(); | |
| 39 | + Map<String, String> map = JsonUtils.convertValue(payload, new TypeReference<>() { | |
| 40 | + }); | |
| 41 | + String deviceSn = map.get("deviceSn"); | |
| 42 | + String firm = FirmEnum.CLOUD.code; | |
| 43 | + String[] split = topic.split("/"); | |
| 44 | + String productId = split[2]; | |
| 45 | + DeviceStatusInfoRecord deviceStatusInfoRecord = new DeviceStatusInfoRecord(); | |
| 46 | + deviceStatusInfoRecord.setId(String.valueOf(System.currentTimeMillis())); | |
| 47 | + deviceStatusInfoRecord.setFirm(firm); | |
| 48 | + deviceStatusInfoRecord.setProductId(productId); | |
| 49 | + deviceStatusInfoRecord.setSn(deviceSn); | |
| 50 | + deviceStatusInfoRecord.setStatus(DeviceStatusEnum.ONLINE.getCode()); | |
| 51 | + deviceStatusInfoRecord.setCreateTime(LocalDateTime.now()); | |
| 52 | + mongoTemplate.save(deviceStatusInfoRecord); | |
| 53 | + } | |
| 22 | 54 | |
| 23 | 55 | } |
| 24 | 56 | } | ... | ... |
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/printer/PrinterPostProcessor.java
| 1 | 1 | package com.diligrp.mqtt.web.processor.printer; |
| 2 | 2 | |
| 3 | +import com.diligrp.mqtt.core.model.CloudRequest; | |
| 4 | +import com.diligrp.mqtt.web.model.ReceiveRelyRecord; | |
| 3 | 5 | import com.diligrp.mqtt.core.model.SendModel; |
| 4 | 6 | import com.diligrp.mqtt.core.service.PostProcessorService; |
| 7 | +import com.diligrp.mqtt.core.type.MessStatusEnum; | |
| 5 | 8 | import com.diligrp.mqtt.core.type.SendEventType; |
| 9 | +import com.diligrp.mqtt.core.util.JsonUtils; | |
| 10 | +import jakarta.annotation.Resource; | |
| 11 | +import org.springframework.data.mongodb.core.MongoTemplate; | |
| 12 | +import org.springframework.data.mongodb.core.query.Criteria; | |
| 13 | +import org.springframework.data.mongodb.core.query.Query; | |
| 14 | +import org.springframework.data.mongodb.core.query.Update; | |
| 6 | 15 | import org.springframework.stereotype.Component; |
| 7 | 16 | |
| 17 | +import java.util.Map; | |
| 18 | + | |
| 8 | 19 | /** |
| 9 | 20 | * @Author: zhangmeiyang |
| 10 | 21 | * @CreateTime: 2026-01-15 16:08 |
| ... | ... | @@ -12,6 +23,9 @@ import org.springframework.stereotype.Component; |
| 12 | 23 | */ |
| 13 | 24 | @Component |
| 14 | 25 | public class PrinterPostProcessor implements PostProcessorService { |
| 26 | + @Resource | |
| 27 | + private MongoTemplate mongoTemplate; | |
| 28 | + | |
| 15 | 29 | @Override |
| 16 | 30 | public SendEventType sendEventType() { |
| 17 | 31 | return SendEventType.PRINTER; |
| ... | ... | @@ -24,6 +38,11 @@ public class PrinterPostProcessor implements PostProcessorService { |
| 24 | 38 | |
| 25 | 39 | @Override |
| 26 | 40 | public void failProcess(SendModel sendModel) { |
| 27 | - | |
| 41 | + Map<String, Object> params = sendModel.getParams(); | |
| 42 | + CloudRequest cloudRequest = JsonUtils.convertValue(params, CloudRequest.class); | |
| 43 | + Update update = new Update(); | |
| 44 | + update.set("messStatus", MessStatusEnum.FAIL.getCode()); | |
| 45 | + Query query = new Query(Criteria.where("_id").is(cloudRequest.getMessIndex() + cloudRequest.getSn())); | |
| 46 | + mongoTemplate.updateFirst(query, update, ReceiveRelyRecord.class); | |
| 28 | 47 | } |
| 29 | 48 | } | ... | ... |
mqtt-web/src/main/java/com/diligrp/mqtt/web/processor/printer/PrinterSuccessReceiveProcessor.java
| ... | ... | @@ -3,8 +3,19 @@ package com.diligrp.mqtt.web.processor.printer; |
| 3 | 3 | import com.diligrp.mqtt.core.model.ReceiveModel; |
| 4 | 4 | import com.diligrp.mqtt.core.service.ReceiveProcessorService; |
| 5 | 5 | import com.diligrp.mqtt.core.type.ReceiveEventType; |
| 6 | +import com.diligrp.mqtt.core.util.JsonUtils; | |
| 7 | +import com.diligrp.mqtt.web.model.ReceiveRelyRecord; | |
| 8 | +import com.fasterxml.jackson.core.type.TypeReference; | |
| 9 | +import jakarta.annotation.Resource; | |
| 10 | +import org.springframework.data.mongodb.core.MongoTemplate; | |
| 11 | +import org.springframework.data.mongodb.core.query.Criteria; | |
| 12 | +import org.springframework.data.mongodb.core.query.Query; | |
| 13 | +import org.springframework.data.mongodb.core.query.Update; | |
| 6 | 14 | import org.springframework.stereotype.Component; |
| 7 | 15 | |
| 16 | +import java.time.LocalDateTime; | |
| 17 | +import java.util.Map; | |
| 18 | + | |
| 8 | 19 | /** |
| 9 | 20 | * @Author: zhangmeiyang |
| 10 | 21 | * @CreateTime: 2026-01-15 16:09 |
| ... | ... | @@ -12,6 +23,9 @@ import org.springframework.stereotype.Component; |
| 12 | 23 | */ |
| 13 | 24 | @Component |
| 14 | 25 | public class PrinterSuccessReceiveProcessor implements ReceiveProcessorService { |
| 26 | + @Resource | |
| 27 | + private MongoTemplate mongoTemplate; | |
| 28 | + | |
| 15 | 29 | @Override |
| 16 | 30 | public ReceiveEventType receiveEventType() { |
| 17 | 31 | return ReceiveEventType.PRINTER_RECEIPT_SERVICE; |
| ... | ... | @@ -19,6 +33,16 @@ public class PrinterSuccessReceiveProcessor implements ReceiveProcessorService { |
| 19 | 33 | |
| 20 | 34 | @Override |
| 21 | 35 | public void successReceive(ReceiveModel receiveModel) { |
| 22 | - | |
| 36 | + String payload = receiveModel.getPayload(); | |
| 37 | + Map<String, String> map = JsonUtils.convertValue(payload, new TypeReference<>() { | |
| 38 | + }); | |
| 39 | + String messIndex = map.get("messIndex"); | |
| 40 | + String deviceSn = map.get("deviceSn"); | |
| 41 | + String messStatus = map.get("messStatus"); | |
| 42 | + Update update = new Update(); | |
| 43 | + update.set("replyStatus", messStatus); | |
| 44 | + update.set("replyTime", LocalDateTime.now()); | |
| 45 | + Query query = new Query(Criteria.where("_id").is(messIndex + deviceSn)); | |
| 46 | + mongoTemplate.updateFirst(query, update, ReceiveRelyRecord.class); | |
| 23 | 47 | } |
| 24 | 48 | } | ... | ... |
mqtt-web/src/main/java/com/diligrp/mqtt/web/service/PrintService.java
| 1 | 1 | package com.diligrp.mqtt.web.service; |
| 2 | 2 | |
| 3 | -import com.diligrp.mqtt.web.message.Message; | |
| 4 | 3 | import com.diligrp.mqtt.core.model.CloudRequest; |
| 4 | +import com.diligrp.mqtt.web.message.Message; | |
| 5 | +import com.diligrp.mqtt.web.model.ReceiveRelyRecord; | |
| 6 | + | |
| 7 | +import java.util.List; | |
| 5 | 8 | |
| 6 | 9 | /** |
| 7 | 10 | * @author lvqi |
| ... | ... | @@ -9,4 +12,6 @@ import com.diligrp.mqtt.core.model.CloudRequest; |
| 9 | 12 | public interface PrintService { |
| 10 | 13 | |
| 11 | 14 | Message<?> cloudPrint(CloudRequest cloudRequest); |
| 15 | + | |
| 16 | + void retry(List<ReceiveRelyRecord> receiveRelyRecords); | |
| 12 | 17 | } | ... | ... |
mqtt-web/src/main/java/com/diligrp/mqtt/web/service/impl/PrintServiceImpl.java
| 1 | 1 | package com.diligrp.mqtt.web.service.impl; |
| 2 | 2 | |
| 3 | +import com.diligrp.mqtt.core.dto.PrintDeviceDto; | |
| 3 | 4 | import com.diligrp.mqtt.core.model.CloudMessContent; |
| 4 | 5 | import com.diligrp.mqtt.core.model.CloudRequest; |
| 5 | -import com.diligrp.mqtt.core.model.ReceiveRelyRecord; | |
| 6 | 6 | import com.diligrp.mqtt.core.model.SendModel; |
| 7 | 7 | import com.diligrp.mqtt.core.service.MqttMessageService; |
| 8 | 8 | import com.diligrp.mqtt.core.type.CharsetType; |
| ... | ... | @@ -11,16 +11,16 @@ import com.diligrp.mqtt.core.type.SendEventType; |
| 11 | 11 | import com.diligrp.mqtt.core.util.BuildCloudPrintCommand; |
| 12 | 12 | import com.diligrp.mqtt.core.util.JsonUtils; |
| 13 | 13 | import com.diligrp.mqtt.web.dto.CloudPrinter; |
| 14 | -import com.diligrp.mqtt.core.dto.PrintDeviceDto; | |
| 15 | 14 | import com.diligrp.mqtt.web.mapper.PlatTenantMapper; |
| 16 | 15 | import com.diligrp.mqtt.web.mapper.PrintDeviceMapper; |
| 17 | 16 | import com.diligrp.mqtt.web.message.Message; |
| 18 | 17 | import com.diligrp.mqtt.web.model.PlatTenant; |
| 18 | +import com.diligrp.mqtt.web.model.ReceiveRelyRecord; | |
| 19 | 19 | import com.diligrp.mqtt.web.service.PrintService; |
| 20 | 20 | import com.fasterxml.jackson.core.type.TypeReference; |
| 21 | +import jakarta.annotation.Resource; | |
| 21 | 22 | import org.slf4j.Logger; |
| 22 | 23 | import org.slf4j.LoggerFactory; |
| 23 | -import org.springframework.beans.factory.annotation.Autowired; | |
| 24 | 24 | import org.springframework.data.mongodb.core.MongoTemplate; |
| 25 | 25 | import org.springframework.stereotype.Service; |
| 26 | 26 | import org.springframework.util.ObjectUtils; |
| ... | ... | @@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets; |
| 29 | 29 | import java.security.KeyFactory; |
| 30 | 30 | import java.security.Signature; |
| 31 | 31 | import java.security.spec.PKCS8EncodedKeySpec; |
| 32 | +import java.time.LocalDateTime; | |
| 32 | 33 | import java.util.*; |
| 33 | 34 | |
| 34 | 35 | /** |
| ... | ... | @@ -37,13 +38,13 @@ import java.util.*; |
| 37 | 38 | @Service |
| 38 | 39 | public class PrintServiceImpl implements PrintService { |
| 39 | 40 | private static final Logger log = LoggerFactory.getLogger(PrintServiceImpl.class); |
| 40 | - @Autowired | |
| 41 | + @Resource | |
| 41 | 42 | private PlatTenantMapper platTenantMapper; |
| 42 | - @Autowired | |
| 43 | + @Resource | |
| 43 | 44 | private PrintDeviceMapper printDeviceMapper; |
| 44 | - @Autowired | |
| 45 | + @Resource | |
| 45 | 46 | private MongoTemplate mongoTemplate; |
| 46 | - @Autowired | |
| 47 | + @Resource | |
| 47 | 48 | private MqttMessageService mqttMessageService; |
| 48 | 49 | |
| 49 | 50 | @Override |
| ... | ... | @@ -106,10 +107,44 @@ public class PrintServiceImpl implements PrintService { |
| 106 | 107 | receiveRelyRecord.setMessIndex(cloudRequest.getMessIndex()); |
| 107 | 108 | receiveRelyRecord.setMessage(JsonUtils.toJsonString(cloudRequest)); |
| 108 | 109 | receiveRelyRecord.setMessStatus(messStatus); |
| 110 | + receiveRelyRecord.setCreateTime(LocalDateTime.now()); | |
| 109 | 111 | mongoTemplate.save(receiveRelyRecord); |
| 110 | 112 | } |
| 111 | 113 | } |
| 112 | 114 | |
| 115 | + @Override | |
| 116 | + public void retry(List<ReceiveRelyRecord> receiveRelyRecords) { | |
| 117 | + for (ReceiveRelyRecord receiveRelyRecord : receiveRelyRecords) { | |
| 118 | + String message = receiveRelyRecord.getMessage(); | |
| 119 | + CloudRequest cloudRequest = JsonUtils.fromJsonString(message, CloudRequest.class); | |
| 120 | + SendModel sendModel = SendModel.withDefault(); | |
| 121 | + CloudPrinter cloudPrinter = new CloudPrinter(); | |
| 122 | + cloudPrinter.setIndex(cloudRequest.getMessIndex()); | |
| 123 | + cloudPrinter.setPrint_num(Integer.valueOf(cloudRequest.getNum())); | |
| 124 | + CloudMessContent mess = cloudRequest.getMess(); | |
| 125 | + try { | |
| 126 | + byte[] bytes = BuildCloudPrintCommand.buildCommand(mess.getPc()); | |
| 127 | + cloudPrinter.setPrint_esc(Base64.getEncoder().encodeToString(bytes)); | |
| 128 | + } catch (Exception e) { | |
| 129 | + log.error("构建打印命令失败", e); | |
| 130 | + } | |
| 131 | + // 语音播报 | |
| 132 | + cloudPrinter.setTts(mess.getSc()); | |
| 133 | + cloudPrinter.setRtc(cloudRequest.getFirmId()); | |
| 134 | + PrintDeviceDto printDevice = printDeviceMapper.queryBySn(cloudRequest.getSn()); | |
| 135 | + sendModel.setTopic(printDevice.getTopic()); | |
| 136 | + Map<String, Object> dataMap = JsonUtils.convertValue(cloudPrinter, new TypeReference<>() { | |
| 137 | + }); | |
| 138 | + sendModel.setPayload(dataMap); | |
| 139 | + sendModel.setSendEventType(SendEventType.PRINTER.value); | |
| 140 | + sendModel.setCharsetType(CharsetType.GBK.code); | |
| 141 | + Map<String, Object> requestMap = JsonUtils.convertValue(cloudRequest, new TypeReference<>() { | |
| 142 | + }); | |
| 143 | + sendModel.setParams(requestMap); | |
| 144 | + mqttMessageService.sendMessage(sendModel); | |
| 145 | + } | |
| 146 | + } | |
| 147 | + | |
| 113 | 148 | public static String handleStr(Map<String, Object> paramsMap) { |
| 114 | 149 | //转成map |
| 115 | 150 | //获取参数名称转成List集合 | ... | ... |