TaskMessageSender.java 2.09 KB
package com.diligrp.cashier.trade.manager;

import com.diligrp.cashier.shared.service.ThreadPoolService;
import com.diligrp.cashier.shared.util.JsonUtils;
import com.diligrp.cashier.trade.Constants;
import com.diligrp.cashier.trade.domain.TaskMessage;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

@Service("taskMessageSender")
public class TaskMessageSender {

    private static final Logger LOG = LoggerFactory.getLogger(TaskMessageSender.class);


    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送延时处理消息
     */
    public void sendDelayTaskMessage(TaskMessage task, long delayInMillis) {
        if (delayInMillis < 0) {
            LOG.debug("No need send scan order message: {}", task);
            return;
        }

        ThreadPoolService.getIoThreadPoll().submit(() -> {
            try {
                MessageProperties properties = new MessageProperties();
                properties.setContentEncoding(StandardCharsets.UTF_8.name());
                properties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
                // properties.setExpiration(String.valueOf(expiredTime));
                // RabbitMQ延时插件必须设置x-delay的header才能生效
                properties.setHeader("x-delay", String.valueOf(delayInMillis));
                String payload = JsonUtils.toJsonString(task);
                Message message = new Message(payload.getBytes(StandardCharsets.UTF_8), properties);
                LOG.debug("Sending async delay task message: {}", task.getPayload());
                rabbitTemplate.send(Constants.PAYMENT_DELAY_EXCHANGE, Constants.PAYMENT_DELAY_KEY, message);
            } catch (Exception ex) {
                LOG.error("Failed to send async delay task message: {}", task.getPayload(), ex);
            }
        });
    }
}