MqttTopicManager.java 2.47 KB
package com.diligrp.mqtt.integration.manager;

import com.diligrp.mqtt.core.model.SubscribeModel;
import com.diligrp.mqtt.core.service.MqttTopicService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;


/**
 * MQTT动态Topic管理服务
 */
@Slf4j
@Service
public class MqttTopicManager implements MqttTopicService {

    @Resource
    private MqttPahoMessageDrivenChannelAdapter mqttMessageDrivenChannelAdapter;


    /**
     * 订阅主题
     *
     * @param subscribeModel 订阅模式
     */
    @Override
    public void subscribeTopic(SubscribeModel subscribeModel) {
        subscribeTopic(subscribeModel.getTopic(), subscribeModel.getQos());
    }

    /**
     * 动态订阅Topic
     *
     * @param topic 要订阅的Topic
     * @param qos   QoS级别
     */
    @Override
    public void subscribeTopic(String topic, int qos) {
        Optional.ofNullable(topic).filter(t -> !isTopicSubscribed(t)).ifPresentOrElse(t -> {
            mqttMessageDrivenChannelAdapter.addTopic(t, qos);
            log.info("成功订阅Topic: {}, QoS: {}", t, qos);
        }, () -> log.info("无效订阅Topic: {}, QoS: {}", topic, qos));
    }

    /**
     * 取消订阅Topic
     *
     * @param topic 要取消的Topic
     */
    @Override
    public void unsubscribeTopic(String topic) {
        Optional.ofNullable(topic).filter(this::isTopicSubscribed).ifPresentOrElse(t -> {
            mqttMessageDrivenChannelAdapter.removeTopic(t);
            log.info("成功取消订阅Topic: {}", topic);
        }, () -> log.info("无效取消订阅Topic: {}", topic));
    }

    /**
     * 检查Topic是否已订阅
     *
     * @param topic 要检查的Topic
     * @return 是否订阅
     */
    public boolean isTopicSubscribed(String topic) {
        return Optional.ofNullable(mqttMessageDrivenChannelAdapter.getTopic()).filter(t -> t.length > 0).map(ts -> new HashSet<>(Arrays.asList(ts))).orElseGet(HashSet::new).contains(topic);
    }

    /**
     * 获取所有已订阅的Topic
     *
     * @return 订阅的Topic数组
     */
    @Override
    public HashSet<String> getSubscribedTopics() {
        return Optional.ofNullable(mqttMessageDrivenChannelAdapter.getTopic()).filter(t -> t.length > 0).map(ts -> new HashSet<>(Arrays.asList(ts))).orElseGet(HashSet::new);
    }

}