RabbitConfiguration.java 2.67 KB
package com.diligrp.assistant.sms;

import com.diligrp.assistant.shared.domain.AsyncMessage;
import com.diligrp.assistant.sms.service.SmsScheduleService;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfiguration.class);

    @Resource
    private SmsScheduleService smsScheduleService;

    /**
     * 短信服务MQ延时队列
     * 队列为持久化、非独占式且不自动删除的队列, 利用RabbitMQ延时插件实现延时功能https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
     */
    @Bean
    public Queue smsDelayQueue() {
        return new Queue(Constants.SMS_DELAY_QUEUE, true, false, false);
    }

    /**
     * 短信服务MQ延时交换机
     */
    @Bean
    public CustomExchange smsDelayExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(Constants.SMS_DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
    }

    /**
     * 短信服务MQ延时队列和交换机的绑定
     */
    @Bean
    public Binding smsDelayBinding() {
        return BindingBuilder.bind(smsDelayQueue()).to(smsDelayExchange()).with(Constants.SMS_DELAY_KEY).noargs();
    }

    @RabbitHandler
    @RabbitListener(queues = {Constants.SMS_DELAY_QUEUE})
    public void onMessage(Message message) {
        byte[] packet = message.getBody();
        MessageProperties properties = message.getMessageProperties();
        String charSet = properties != null && properties.getContentEncoding() != null
                ? properties.getContentEncoding() : StandardCharsets.UTF_8.name();

        try {
            String body = new String(packet, charSet);
            LOGGER.info("Handling sms service job: {}", body);
            AsyncMessage job = AsyncMessage.from(body);
            if (job.getType() == Constants.MESSAGE_TEMPLATE_STATE) {
                smsScheduleService.executeSmsTemplateJob(job.getPayload());
            } else {
                LOGGER.error("Unrecognized sms service job: {}", job.getType());
            }
        } catch (Exception ex) {
            LOGGER.error("Handle sms service job exception", ex);
        }
    }
}