MqttMessageReceiver.java 2.86 KB
package com.diligrp.mqtt.integration.handler;

import com.diligrp.mqtt.core.event.RecFailEvent;
import com.diligrp.mqtt.core.event.RecSuccessEvent;
import com.diligrp.mqtt.core.model.ReceiveModel;
import com.diligrp.mqtt.core.util.JsonUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * MQTT消息接收处理器
 */
@Slf4j
@Component
public class MqttMessageReceiver {

    private static final String RETAINED_HEADER = "mqtt_receivedRetained";
    private static final String MQTT_ID_HEADER = "mqtt_id";
    private static final String MQTT_DUPLICATE_HEADER = "mqtt_duplicate";
    private static final String ID_HEADER = "id";
    private static final String TOPIC_HEADER = "mqtt_receivedTopic";
    private static final String QOS_HEADER = "mqtt_receivedQos";
    private static final String TIMESTAMP_HEADER = "timestamp";
    private static final String SUCCESS_EVENT_HEADER = "mqtt_success_event";
    private static final String FAIL_EVENT_HEADER = "mqtt_fail_event";

    @Resource
    private ApplicationEventPublisher applicationEventPublisher;

    /**
     * 处理接收到的MQTT消息
     */
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public void handleMqttMessage(Message<?> message) {
        log.info("收到MQTT消息 - :{}", JsonUtils.toJsonString(message));
        try {
            String topic = message.getHeaders().get(TOPIC_HEADER, String.class);
            Integer qos = message.getHeaders().get(QOS_HEADER, Integer.class);
            Boolean retained = message.getHeaders().get(RETAINED_HEADER, Boolean.class);
            UUID id = message.getHeaders().get(ID_HEADER, UUID.class);
            Long timestamp = message.getHeaders().get(TIMESTAMP_HEADER, Long.class);
            Integer mqttId = message.getHeaders().get(MQTT_ID_HEADER, Integer.class);
            Boolean duplicate = message.getHeaders().get(MQTT_DUPLICATE_HEADER, Boolean.class);
            String payload = message.getPayload().toString();
            var receiveModel = new ReceiveModel();
            receiveModel.setId(id);
            receiveModel.setTopic(topic);
            receiveModel.setQos(qos);
            receiveModel.setMqttId(mqttId);
            receiveModel.setRetained(retained);
            receiveModel.setDuplicate(duplicate);
            receiveModel.setPayload(payload);
            receiveModel.setTimestamp(timestamp);
            applicationEventPublisher.publishEvent(new RecSuccessEvent(this, receiveModel));
        } catch (Exception e) {
            log.error("处理MQTT消息失败", e);
            applicationEventPublisher.publishEvent(new RecFailEvent(this, message));

        }
    }
}