IntegrationConfig.java
4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.diligrp.mqtt.integration;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @Author: zhangmeiyang
* @CreateTime: 2026-01-14 11:15
* @Version: todo
*/
@Configuration
@ComponentScan(basePackages = "com.diligrp.mqtt.integration")
@Data
@Slf4j
@EnableIntegration
public class IntegrationConfig {
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.timeOut}")
private int timeOut;
@Value("${mqtt.cleanSession}")
private boolean cleanSession;
@Value("${mqtt.automaticReconnect}")
private boolean automaticReconnect;
@Value("${mqtt.qos}")
private int qos;
/**
* MQTT 客户端工厂
*
* @return {@link MqttPahoClientFactory }
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{url});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(automaticReconnect);
options.setCleanSession(cleanSession);
options.setConnectionTimeout(timeOut);
factory.setConnectionOptions(options);
return factory;
}
/**
* outbound通道
*
* @return {@link MessageChannel }
*/
@Bean(name = "mqttOutboundChannel")
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 消息发送处理器(支持回执)
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-sender", mqttClientFactory());
handler.setDefaultQos(qos);
handler.setAsync(true);
handler.setCompletionTimeout(5000);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultRetained(false);
log.info("Mqtt消息发送处理器初始化完成");
return handler;
}
/**
* 接收消息的通道
*/
@Bean(name = "mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息驱动适配器(用于接收消息)
*/
@Bean(name = "mqttPahoMessageDrivenChannelAdapter")
public MqttPahoMessageDrivenChannelAdapter inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-receiver", mqttClientFactory());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(qos);
adapter.setOutputChannel(mqttInboundChannel());
log.info("Mqtt消息接收适配器初始化完成");
return adapter;
}
}