Commit b287db95f412b213bf728424414a658ea251cf01

Authored by zhangmeiyang
1 parent 66e85e38

z-push

etrade-rpc/src/main/java/com/diligrp/etrade/rpc/executor/ZrAutoPush.java 0 → 100644
  1 +package com.diligrp.etrade.rpc.executor;
  2 +
  3 +import jakarta.annotation.Resource;
  4 +import org.springframework.amqp.core.*;
  5 +import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6 +import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7 +import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8 +import org.springframework.beans.factory.annotation.Qualifier;
  9 +import org.springframework.context.annotation.Bean;
  10 +import org.springframework.context.annotation.Configuration;
  11 +import org.springframework.stereotype.Component;
  12 +
  13 +import java.nio.charset.StandardCharsets;
  14 +import java.util.HashMap;
  15 +import java.util.Map;
  16 +
  17 +/**
  18 + * @Author: zhangmeiyang
  19 + * @CreateTime: 2024-09-02 15:51
  20 + * @Version: todo
  21 + */
  22 +@Configuration
  23 +public class ZrAutoPush {
  24 + public static final String ZR_NORMAL_EXCHANGE = "zr.normal.exchange";
  25 + public static final String ZR_NORMAL_QUEUE = "zr.normal.queue";
  26 + public static final String ZR_NORMAL_ROUTING = "zr.normal.routing.process";
  27 + public static final String ZR_DEAD_EXCHANGE = "zr.dead.exchange";
  28 + public static final String ZR_DEAD_EXCHANGE_TYPE = "x-delayed-message";
  29 + public static final String ZR_DEAD_QUEUE = "zr.dead.queue";
  30 +
  31 + @Bean("normalExchange")
  32 + public DirectExchange normalExchange(){
  33 + return new DirectExchange(ZR_NORMAL_EXCHANGE,true,false);
  34 + }
  35 +
  36 + @Bean("normalQueue")
  37 + public Queue normalQueue(){
  38 + return new Queue(ZR_NORMAL_QUEUE);
  39 + }
  40 +
  41 + @Bean("normalBinding")
  42 + public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") DirectExchange normalExchange) {
  43 + return BindingBuilder.bind(normalQueue).to(normalExchange).with(ZR_NORMAL_ROUTING);
  44 + }
  45 +
  46 + @Bean("dlxCustomQueue")
  47 + public Queue dlxCustomQueue() {
  48 + return new Queue(ZR_DEAD_QUEUE, true, false, false);
  49 + }
  50 +
  51 + @Bean("dlxCustomExchange")
  52 + public CustomExchange dlxCustomExchange() {
  53 + Map<String, Object> args = new HashMap<>();
  54 + args.put("x-delayed-type", "direct");
  55 + return new CustomExchange(ZR_DEAD_EXCHANGE, ZR_DEAD_EXCHANGE_TYPE, true,false,args);
  56 + }
  57 +
  58 + @Bean("dlxCustomBinding")
  59 + public Binding dlxCustomBinding(@Qualifier("dlxCustomQueue") Queue queue, @Qualifier("dlxCustomExchange") CustomExchange exchange) {
  60 + return BindingBuilder.bind(queue).to(exchange).with(ZR_DEAD_QUEUE).noargs();
  61 + }
  62 +
  63 +}
... ...
etrade-rpc/src/main/java/com/diligrp/etrade/rpc/executor/ZrReceiver.java 0 → 100644
  1 +package com.diligrp.etrade.rpc.executor;
  2 +
  3 +import com.diligrp.etrade.core.util.JsonUtils;
  4 +import com.rabbitmq.client.Channel;
  5 +import jakarta.annotation.Resource;
  6 +import org.springframework.amqp.core.Message;
  7 +import org.springframework.amqp.core.MessageBuilder;
  8 +import org.springframework.amqp.rabbit.annotation.Exchange;
  9 +import org.springframework.amqp.rabbit.annotation.Queue;
  10 +import org.springframework.amqp.rabbit.annotation.QueueBinding;
  11 +import org.springframework.amqp.rabbit.annotation.RabbitListener;
  12 +import org.springframework.amqp.rabbit.core.RabbitTemplate;
  13 +import org.springframework.stereotype.Component;
  14 +
  15 +import java.io.IOException;
  16 +import java.nio.charset.StandardCharsets;
  17 +import java.util.Map;
  18 +
  19 +import static com.diligrp.etrade.rpc.executor.ZrAutoPush.*;
  20 +
  21 +/**
  22 + * @Author: zhangmeiyang
  23 + * @CreateTime: 2024-09-02 16:32
  24 + * @Version: todo
  25 + */
  26 +@Component
  27 +public class ZrReceiver {
  28 +
  29 + @Resource
  30 + private RabbitTemplate rabbitTemplate;
  31 +
  32 + @RabbitListener(bindings = @QueueBinding(value = @Queue(value = ZR_NORMAL_QUEUE, autoDelete = "false"), exchange = @Exchange(value = ZR_NORMAL_EXCHANGE), key = ZR_NORMAL_ROUTING), ackMode = "MANUAL")
  33 + public void receiveMessage(Channel channel, Message message) throws IOException {
  34 + try {
  35 + var temp = 1/0;
  36 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  37 + }catch (Exception e){
  38 + StackTraceElement[] stackTrace = e.getStackTrace();
  39 + String jsonString = JsonUtils.toJsonString(stackTrace);
  40 + System.out.println(jsonString);
  41 + sendDelayMsg(3000L,message.getBody());
  42 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  43 + }
  44 + }
  45 +
  46 + @RabbitListener(queues = ZR_DEAD_QUEUE)
  47 + public void handleCustomDelayMsg(Channel channel, Message message) throws IOException {
  48 + Map<String, Object> headersMap = message.getMessageProperties().getHeaders();
  49 + int retryCount = (int) headersMap.get("x-retry-count");
  50 + try {
  51 + if (retryCount > 3){
  52 + //db
  53 + }else {
  54 +
  55 + }
  56 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  57 + }catch (Exception e){
  58 + retryCount++;
  59 + headersMap.put("x-retry-count", retryCount);
  60 + channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  61 + }
  62 + }
  63 +
  64 + public void sendDelayMsg(Long currentMills,byte[] bytes) {
  65 + Message msg = MessageBuilder.withBody(bytes)
  66 + .setHeader("x-delay", currentMills)
  67 + .setHeader("x-retry-count", 0)
  68 + .build();
  69 + rabbitTemplate.convertAndSend(ZR_DEAD_EXCHANGE, ZR_DEAD_QUEUE, msg);
  70 + }
  71 +}
... ...