TaskMessageSender.java
2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.diligrp.cashier.trade.manager;
import com.diligrp.cashier.shared.service.ThreadPoolService;
import com.diligrp.cashier.shared.util.JsonUtils;
import com.diligrp.cashier.trade.Constants;
import com.diligrp.cashier.trade.domain.TaskMessage;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@Service("taskMessageSender")
public class TaskMessageSender {
private static final Logger LOG = LoggerFactory.getLogger(TaskMessageSender.class);
private static final long ONE_MINUTE = 60 * 1000;
private static final long TEN_MINUTES = 10 * ONE_MINUTE;
private static final long ONE_SECOND = 1000;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送延时处理消息
*/
public void sendDelayTaskMessage(TaskMessage task, long delayInMillis) {
if (delayInMillis < 0) {
LOG.info("No need send scan order message[type={}]", task.getType());
return;
}
ThreadPoolService.getIoThreadPoll().submit(() -> {
try {
MessageProperties properties = new MessageProperties();
properties.setContentEncoding(StandardCharsets.UTF_8.name());
properties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
// properties.setExpiration(String.valueOf(expiredTime));
// RabbitMQ延时插件必须设置x-delay的header才能生效
properties.setHeader("x-delay", String.valueOf(delayInMillis));
String payload = JsonUtils.toJsonString(task);
Message message = new Message(payload.getBytes(StandardCharsets.UTF_8), properties);
LOG.info("Sending online payment order scan request for {}", task.getPayload());
rabbitTemplate.send(Constants.PAYMENT_DELAY_EXCHANGE, Constants.PAYMENT_DELAY_KEY, message);
} catch (Exception ex) {
LOG.error("Failed to send online payment order scan request for {}", task.getPayload(), ex);
}
});
}
}