MQServiceImpl.java
5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.sl.mq.service.impl;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.sl.mq.entity.FailMsgEntity;
import com.sl.mq.exception.MsgException;
import com.sl.mq.service.FailMsgService;
import com.sl.mq.service.MQService;
import com.sl.transport.common.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 消息处理类
*
* @author zzj
* @version 1.0
*/
@Slf4j
@Service
//设置初始化条件,只有在配置文件中设置sl.mq.enable=true才能触发
@ConditionalOnProperty(prefix = "sl.mq", value = "enable")
public class MQServiceImpl implements MQService {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private FailMsgService failMsgService;
/**
* 发送消息 重试3次
*
* @param exchange 交换机
* @param routingKey 路由key
* @param msg 消息对象,会将对象序列化成json字符串发出
* @return 是否发送成功
*/
@Retryable(value = MsgException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
@Override
public Boolean sendMsg(String exchange, String routingKey, Object msg) {
// 实时发送
return this.sendMsg(exchange, routingKey, msg, Constants.MQ.DEFAULT_DELAY);
}
@Retryable(value = MsgException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
@Override
public Boolean sendMsg(String exchange, String routingKey, Object msg, int delay) {
// 1.获取消息内容,如果非字符串将其序列化
String jsonMsg = (msg instanceof String) ? (String) msg : JSONUtil.toJsonStr(msg);
String msgId = IdUtil.simpleUUID();
log.info("消息发送!exchange = {}, routingKey = {}, msg = {}, msgId = {}", exchange, routingKey, jsonMsg, msgId);
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(msgId);
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()) {
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
} else {
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因:{}", correlationData.getId(), result.getReason());
FailMsgEntity failMsgEntity = FailMsgEntity.builder()
.exchange(exchange)
.msgId(correlationData.getId())
.msg(jsonMsg)
.reason(result.getReason()).build();
this.failMsgService.save(failMsgEntity);
}
},
ex -> {
//出现此异常情况,可能是发送消息后无法连接到发送者或其他一些未知的异常情况
String exceptionInfo = ExceptionUtil.getMessage(ex);
log.error("消息发送异常, ID:{}, 原因:{}", correlationData.getId(), exceptionInfo);
FailMsgEntity failMsgEntity = FailMsgEntity.builder()
.exchange(exchange)
.msgId(correlationData.getId())
.msg(jsonMsg)
.reason(exceptionInfo).build();
this.failMsgService.save(failMsgEntity);
}
);
// 4.构造消息对象
Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.build();
try {
// 5.发送消息
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor -> {
//设置延时发送时间,delay小于0,实时发送
messagePostProcessor.getMessageProperties().setDelay(delay);
return messagePostProcessor;
}, correlationData);
} catch (Exception e) {
//出现异常,这里将异常转化为自定义异常MsgException,主要是为了触发重试机制
throw MsgException.builder()
.failMsgEntity(FailMsgEntity.builder()
.exchange(exchange)
.msgId(correlationData.getId())
.msg(jsonMsg)
.reason(ExceptionUtil.getMessage(e)).build())
.build();
}
return true;
}
/**
* 3次失败后 存入数据库
*
* @param msgException 失败消息
*/
@Recover
public Boolean saveFailMag(MsgException msgException) {
//发送消息失败,需要将消息持久化到数据库,通过任务调度的方式处理失败的消息
this.failMsgService.save(msgException.getFailMsgEntity());
return true;
}
}