MqttMessageReceiver.java
2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.diligrp.mqtt.integration.handler;
import com.diligrp.mqtt.core.event.RecFailEvent;
import com.diligrp.mqtt.core.event.RecSuccessEvent;
import com.diligrp.mqtt.core.model.ReceiveModel;
import com.diligrp.mqtt.core.type.ReceiveEventType;
import com.diligrp.mqtt.core.util.JsonUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* MQTT消息接收处理器
*/
@Slf4j
@Component
public class MqttMessageReceiver {
private static final String RETAINED_HEADER = "mqtt_receivedRetained";
private static final String MQTT_ID_HEADER = "mqtt_id";
private static final String MQTT_DUPLICATE_HEADER = "mqtt_duplicate";
private static final String ID_HEADER = "id";
private static final String TOPIC_HEADER = "mqtt_receivedTopic";
private static final String QOS_HEADER = "mqtt_receivedQos";
private static final String TIMESTAMP_HEADER = "timestamp";
@Resource
private ApplicationEventPublisher applicationEventPublisher;
/**
* 处理接收到的MQTT消息
*/
@ServiceActivator(inputChannel = "mqttInboundChannel")
public void handleMqttMessage(Message<?> message) {
log.info("收到MQTT消息 - :{}", JsonUtils.toJsonString(message));
try {
String topic = message.getHeaders().get(TOPIC_HEADER, String.class);
Integer qos = message.getHeaders().get(QOS_HEADER, Integer.class);
Boolean retained = message.getHeaders().get(RETAINED_HEADER, Boolean.class);
UUID id = message.getHeaders().get(ID_HEADER, UUID.class);
Long timestamp = message.getHeaders().get(TIMESTAMP_HEADER, Long.class);
Integer mqttId = message.getHeaders().get(MQTT_ID_HEADER, Integer.class);
Boolean duplicate = message.getHeaders().get(MQTT_DUPLICATE_HEADER, Boolean.class);
String payload = message.getPayload().toString();
var receiveModel = new ReceiveModel();
receiveModel.setId(id);
receiveModel.setTopic(topic);
receiveModel.setQos(qos);
receiveModel.setMqttId(mqttId);
receiveModel.setRetained(retained);
receiveModel.setDuplicate(duplicate);
receiveModel.setPayload(payload);
receiveModel.setTimestamp(timestamp);
receiveModel.setReceiveEventType(ReceiveEventType.matchEventType(topic));
applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel));
} catch (Exception e) {
log.error("处理MQTT消息失败", e);
applicationEventPublisher.publishEvent(new RecFailEvent(this, message));
}
}
}