IntegrationConfig.java 4.01 KB
package com.diligrp.mqtt.integration;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * @Author: zhangmeiyang
 * @CreateTime: 2026-01-14 11:15
 * @Version: todo
 */
@Configuration
@ComponentScan(basePackages = "com.diligrp.mqtt.integration")
@Data
@Slf4j
@EnableIntegration
public class IntegrationConfig {
    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.timeOut}")
    private int timeOut;

    @Value("${mqtt.cleanSession}")
    private boolean cleanSession;

    @Value("${mqtt.automaticReconnect}")
    private boolean automaticReconnect;

    @Value("${mqtt.qos}")
    private int qos;


    /**
     * MQTT 客户端工厂
     *
     * @return {@link MqttPahoClientFactory }
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{url});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setAutomaticReconnect(automaticReconnect);
        options.setCleanSession(cleanSession);
        options.setConnectionTimeout(timeOut);
        factory.setConnectionOptions(options);
        return factory;
    }

    /**
     * outbound通道
     *
     * @return {@link MessageChannel }
     */
    @Bean(name = "mqttOutboundChannel")
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    /**
     * 消息发送处理器(支持回执)
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-sender", mqttClientFactory());
        handler.setDefaultQos(qos);
        handler.setAsync(true);
        handler.setCompletionTimeout(5000);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultRetained(false);
        log.info("Mqtt消息发送处理器初始化完成");
        return handler;
    }

    /**
     * 接收消息的通道
     */
    @Bean(name = "mqttInboundChannel")
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息驱动适配器(用于接收消息)
     */
    @Bean(name = "mqttPahoMessageDrivenChannelAdapter")
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-receiver", mqttClientFactory());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(qos);
        adapter.setOutputChannel(mqttInboundChannel());
        log.info("Mqtt消息接收适配器初始化完成");
        return adapter;
    }
}