Commit 436cb39c53bd2b0abce99a47ab627aa7248c26cf

Authored by shaofan
1 parent b62efd8c

Add RabbitMQ support and implement message synchronization for org operations:

- Added `spring-boot-starter-amqp` dependency in POM.
- Introduced classes `CorrelationDataEx`, `RetryTask`, `RetryTaskHolder`, and `RabbitMessageSender` for RabbitMQ message handling and retry logic.
- Updated `OrgController` to send RabbitMQ notifications for add, update, and delete operations.
- Modified `bootstrap.yml` to include `rabbitmq.yml` in shared Nacos configurations.
itcast-auth/itcast-auth-server/pom.xml
... ... @@ -52,6 +52,10 @@
52 52 <artifactId>itcast-tools-core</artifactId>
53 53 </dependency>
54 54 <dependency>
  55 + <groupId>org.springframework.boot</groupId>
  56 + <artifactId>spring-boot-starter-amqp</artifactId>
  57 + </dependency>
  58 + <dependency>
55 59 <groupId>org.springframework.cloud</groupId>
56 60 <artifactId>spring-cloud-starter-openfeign</artifactId>
57 61 <exclusions>
... ...
itcast-auth/itcast-auth-server/src/main/java/com/itheima/authority/biz/mq/CorrelationDataEx.java 0 → 100644
  1 +package com.itheima.authority.biz.mq;
  2 +
  3 +import org.springframework.amqp.core.Message;
  4 +import org.springframework.amqp.rabbit.connection.CorrelationData;
  5 +
  6 +/**
  7 + * 扩展
  8 + */
  9 +public class CorrelationDataEx extends CorrelationData {
  10 +
  11 + private Message message;
  12 + private String exchange;
  13 + private String routingKey;
  14 + private int maxRetry;
  15 + private int retryCount;
  16 +
  17 + public CorrelationDataEx(String id) {
  18 + super(id);
  19 + }
  20 +
  21 + public Message getMessage() {
  22 + return message;
  23 + }
  24 +
  25 + public void setMessage(Message message) {
  26 + this.message = message;
  27 + }
  28 +
  29 + public String getExchange() {
  30 + return exchange;
  31 + }
  32 +
  33 + public void setExchange(String exchange) {
  34 + this.exchange = exchange;
  35 + }
  36 +
  37 + public String getRoutingKey() {
  38 + return routingKey;
  39 + }
  40 +
  41 + public void setRoutingKey(String routingKey) {
  42 + this.routingKey = routingKey;
  43 + }
  44 +
  45 + public int getMaxRetry() {
  46 + return maxRetry;
  47 + }
  48 +
  49 + public void setMaxRetry(int maxRetry) {
  50 + this.maxRetry = maxRetry;
  51 + }
  52 +
  53 + public int getRetryCount() {
  54 + return retryCount;
  55 + }
  56 +
  57 + public void setRetryCount(int retryCount) {
  58 + this.retryCount = retryCount;
  59 + }
  60 +
  61 + public boolean hasNext() {
  62 + this.retryCount += 1;
  63 + return this.retryCount < this.maxRetry;
  64 + }
  65 +}
... ...
itcast-auth/itcast-auth-server/src/main/java/com/itheima/authority/biz/mq/RabbitMessageSender.java 0 → 100644
  1 +package com.itheima.authority.biz.mq;
  2 +
  3 +import cn.hutool.core.util.StrUtil;
  4 +import org.slf4j.Logger;
  5 +import org.slf4j.LoggerFactory;
  6 +import org.springframework.amqp.AmqpException;
  7 +import org.springframework.amqp.core.Message;
  8 +import org.springframework.amqp.core.MessageBuilder;
  9 +import org.springframework.amqp.rabbit.connection.CorrelationData;
  10 +import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11 +import org.springframework.beans.factory.annotation.Autowired;
  12 +import org.springframework.stereotype.Component;
  13 +
  14 +import java.nio.charset.StandardCharsets;
  15 +import java.util.UUID;
  16 +
  17 +/**
  18 + * rabbitmq 消息发送
  19 + */
  20 +@Component
  21 +public class RabbitMessageSender {
  22 + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMessageSender.class);
  23 +
  24 + @Autowired
  25 + private RabbitTemplate rabbitTemplate;
  26 +
  27 + private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
  28 + @Override
  29 + public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  30 + if (!ack) {
  31 + if (!(correlationData instanceof CorrelationDataEx)) {
  32 + return;
  33 + }
  34 + retry((CorrelationDataEx) correlationData);
  35 + }
  36 + }
  37 + };
  38 +
  39 + private RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
  40 + @Override
  41 + public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  42 + LOGGER.info("消息报文:{}", new String(message.getBody()));
  43 + LOGGER.info("消息编号:{}", replyCode);
  44 + LOGGER.info("描述:{}", replyText);
  45 + LOGGER.info("交换机名称:{}", exchange);
  46 + LOGGER.info("路由名称:{}", routingKey);
  47 + }
  48 + };
  49 +
  50 + private void retry(CorrelationDataEx correlationDataEx) {
  51 + if (correlationDataEx.hasNext()) {
  52 + RetryTaskHolder.offer(new RetryTask(1, 5000, 5000, () -> send(correlationDataEx)));
  53 + }
  54 + }
  55 +
  56 + private boolean send(CorrelationDataEx correlationDataEx) {
  57 + this.rabbitTemplate.setMandatory(true);
  58 + this.rabbitTemplate.setConfirmCallback(this.confirmCallback);
  59 + this.rabbitTemplate.setReturnCallback(this.returnCallback);
  60 + try {
  61 + this.rabbitTemplate.convertAndSend(correlationDataEx.getExchange(), correlationDataEx.getRoutingKey(), correlationDataEx.getMessage(), correlationDataEx);
  62 + return true;
  63 + } catch (AmqpException e) {
  64 + LOGGER.error("Rabbit mq send error", e);
  65 + return false;
  66 + }
  67 + }
  68 +
  69 + /**
  70 + * 发送消息
  71 + *
  72 + * @param exchange
  73 + * @param routingKey
  74 + * @param json
  75 + * @param maxRetry
  76 + */
  77 + public void send(String exchange, String routingKey, String json, int maxRetry) {
  78 + if (StrUtil.isEmpty(exchange) || StrUtil.isEmpty(json)) {
  79 + return;
  80 + }
  81 + String uuid = UUID.randomUUID().toString();
  82 + Message message = MessageBuilder.withBody(json.getBytes(StandardCharsets.UTF_8)).setContentType("application/json").setContentEncoding("UTF-8").setMessageId(uuid).build();
  83 + CorrelationDataEx correlationData = new CorrelationDataEx(uuid);
  84 + correlationData.setMessage(message);
  85 + correlationData.setExchange(exchange);
  86 + correlationData.setRoutingKey(routingKey);
  87 + correlationData.setMaxRetry(maxRetry);
  88 + if (!send(correlationData)) {
  89 + retry(correlationData);
  90 + }
  91 + }
  92 +}
... ...
itcast-auth/itcast-auth-server/src/main/java/com/itheima/authority/biz/mq/RetryTask.java 0 → 100644
  1 +package com.itheima.authority.biz.mq;
  2 +
  3 +
  4 +import javax.validation.constraints.NotNull;
  5 +import java.util.concurrent.Callable;
  6 +import java.util.concurrent.Delayed;
  7 +import java.util.concurrent.TimeUnit;
  8 +
  9 +/** 重试任务 */
  10 +public class RetryTask implements Delayed {
  11 +
  12 + /** 执行时间 毫秒 */
  13 + private long mills;
  14 + /** 时间间隔 毫秒 */
  15 + private long interval;
  16 + /** 最大执行次数 */
  17 + private int max;
  18 + /** 当前执行次数 */
  19 + private int current;
  20 + /** 执行任务 */
  21 + private Callable<Boolean> callable;
  22 +
  23 + /**
  24 + * 构造函数
  25 + * @param max 最大执行次数
  26 + * @param interval 时间间隔 毫秒
  27 + * @param callable 任务
  28 + */
  29 + public RetryTask(int max, long interval, Callable<Boolean> callable) {
  30 + this.mills = System.currentTimeMillis();
  31 + this.max = max;
  32 + this.interval = interval;
  33 + this.callable = callable;
  34 + }
  35 +
  36 + /**
  37 + * 构造函数
  38 + * @param max 最大执行次数
  39 + * @param interval 时间间隔 毫秒
  40 + * @param offset 时间偏移 毫秒
  41 + * @param callable 任务
  42 + */
  43 + public RetryTask(int max, long interval, long offset, Callable<Boolean> callable) {
  44 + this.mills = System.currentTimeMillis() + offset;
  45 + this.max = max;
  46 + this.interval = interval;
  47 + this.callable = callable;
  48 + }
  49 +
  50 + @Override
  51 + public long getDelay(@NotNull TimeUnit unit) {
  52 + long temp = this.mills - System.currentTimeMillis();
  53 + return unit.convert(temp, TimeUnit.MILLISECONDS);
  54 + }
  55 +
  56 + @Override
  57 + public int compareTo(@NotNull Delayed delayed) {
  58 + RetryTask retryTask = (RetryTask)delayed;
  59 + return this.mills - retryTask.getMills() < 0 ? -1 : 1;
  60 + }
  61 +
  62 + public long getMills() {
  63 + return mills;
  64 + }
  65 +
  66 + public void setMills(long mills) {
  67 + this.mills = mills;
  68 + }
  69 +
  70 + public long getInterval() {
  71 + return interval;
  72 + }
  73 +
  74 + public void setInterval(long interval) {
  75 + this.interval = interval;
  76 + }
  77 +
  78 + public int getMax() {
  79 + return max;
  80 + }
  81 +
  82 + public void setMax(int max) {
  83 + this.max = max;
  84 + }
  85 +
  86 + public int getCurrent() {
  87 + return current;
  88 + }
  89 +
  90 + public void setCurrent(int current) {
  91 + this.current = current;
  92 + }
  93 +
  94 + public Callable<Boolean> getCallable() {
  95 + return callable;
  96 + }
  97 +
  98 + public void setCallable(Callable<Boolean> callable) {
  99 + this.callable = callable;
  100 + }
  101 +
  102 + public boolean hasNext() {
  103 + this.current += 1;
  104 + this.mills = System.currentTimeMillis() + this.interval;
  105 + return this.current < this.max;
  106 + }
  107 +}
... ...
itcast-auth/itcast-auth-server/src/main/java/com/itheima/authority/biz/mq/RetryTaskHolder.java 0 → 100644
  1 +package com.itheima.authority.biz.mq;
  2 +
  3 +import org.slf4j.Logger;
  4 +import org.slf4j.LoggerFactory;
  5 +import org.springframework.core.task.AsyncTaskExecutor;
  6 +
  7 +import java.util.concurrent.DelayQueue;
  8 +
  9 +/**
  10 + * 重试任务处理
  11 + */
  12 +public class RetryTaskHolder {
  13 + private static final Logger LOGGER = LoggerFactory.getLogger(RetryTaskHolder.class);
  14 +
  15 + private static final DelayQueue<RetryTask> RETRY_TASK_QUEUE = new DelayQueue<>();
  16 +
  17 + //直接开启线程执行
  18 + static {
  19 + int availableProcessors = Runtime.getRuntime().availableProcessors();
  20 + for (int i = 0; i < availableProcessors; i++) {
  21 +
  22 + }
  23 + }
  24 +
  25 + /**
  26 + * 添加延时任务
  27 + * @param retryTask
  28 + * @return
  29 + */
  30 + public static boolean offer(RetryTask retryTask) {
  31 + return RETRY_TASK_QUEUE.offer(retryTask);
  32 + }
  33 +}
... ...
itcast-auth/itcast-auth-server/src/main/java/com/itheima/authority/controller/core/OrgController.java
1 1 package com.itheima.authority.controller.core;
2 2  
  3 +import cn.hutool.json.JSONObject;
  4 +import cn.hutool.json.JSONUtil;
3 5 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4 6 import com.baomidou.mybatisplus.core.metadata.IPage;
5 7 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  8 +import com.google.common.collect.Lists;
  9 +import com.itheima.authority.biz.mq.RabbitMessageSender;
6 10 import com.itheima.authority.biz.service.auth.UserService;
7 11 import com.itheima.authority.biz.service.core.OrgService;
8 12 import com.itheima.authority.dto.core.OrgSaveDTO;
... ... @@ -56,6 +60,8 @@ public class OrgController extends BaseController {
56 60 private UserService userService;
57 61 @Autowired
58 62 private DozerUtils dozer;
  63 + @Autowired
  64 + private RabbitMessageSender rabbitMessageSender;
59 65  
60 66 // /**
61 67 // * 分页查询组织
... ... @@ -126,6 +132,13 @@ public class OrgController extends BaseController {
126 132 org.setTreePath(StringUtils.join(parent.getTreePath(), parent.getId(), DEF_ROOT_PATH));
127 133 }
128 134 this.orgService.saveOrg(org);
  135 +
  136 + // 同步mq通知修改neo4j信息
  137 + JSONObject jsonObject = new JSONObject();
  138 + jsonObject.put("type", "ORG");
  139 + jsonObject.put("operation", "ADD");
  140 + jsonObject.put("content", Lists.newArrayList(org));
  141 + rabbitMessageSender.send("itcast-auth", "add", jsonObject.toString(), 3);
129 142 return this.success(org);
130 143 }
131 144  
... ... @@ -135,11 +148,20 @@ public class OrgController extends BaseController {
135 148 public R<Org> update(@RequestBody @Validated(SuperEntity.Update.class) OrgUpdateDTO data) {
136 149 Org org = this.dozer.map(data, Org.class);
137 150 this.orgService.updateOrg(org);
  151 +
  152 + // 同步mq通知修改neo4j信息
  153 + JSONObject jsonObject = new JSONObject();
  154 + jsonObject.put("type", "ORG");
  155 + jsonObject.put("operation", "UPDATE");
  156 + jsonObject.put("content", Lists.newArrayList(org));
  157 + rabbitMessageSender.send("itcast-auth", "update", jsonObject.toString(), 3);
  158 +
138 159 if (data.getStatus() != null) {
139 160 org = new Org();
140 161 org.setId(data.getId());
141 162 org.setStatus(data.getStatus());
142 163 this.orgService.updateStatus(org);
  164 +
143 165 }
144 166 return this.success(org);
145 167 }
... ... @@ -168,7 +190,23 @@ public class OrgController extends BaseController {
168 190 @SysLog("删除组织")
169 191 @DeleteMapping
170 192 public R<List<Org>> delete(@RequestParam("ids[]") List<Long> ids) {
  193 + // 查询待删除的组织
  194 + List<Org> delList = new ArrayList<>();
  195 + for (Long id : ids) {
  196 + Org org = this.orgService.findById(id);
  197 + delList.add(org);
  198 + }
171 199 List<Org> orgs = this.orgService.remove(ids);
  200 +
  201 + // 同步mq通知修改neo4j信息
  202 + for (Org org : delList) {
  203 + JSONObject jsonObject = new JSONObject();
  204 + jsonObject.put("type", "ORG");
  205 + jsonObject.put("operation", "DEL");
  206 + jsonObject.put("content", Lists.newArrayList(org));
  207 + rabbitMessageSender.send("itcast-auth", "DEL", jsonObject.toString(), 3);
  208 + }
  209 +
172 210 return this.success(orgs);
173 211 }
174 212  
... ...
itcast-auth/itcast-auth-server/src/main/resources/bootstrap.yml
... ... @@ -20,7 +20,7 @@ spring:
20 20 namespace: aa86e672-3e5a-4480-9910-dd70bb65a7bc
21 21 #支持多个共享 Data Id 的配置,多个之间用逗号隔开,多个共享配置间的一个优先级的关系我们约定:按照配置出现的先后顺序,即后面的优先级要高于前面
22 22 #Data Id 必须带文件扩展名,文件扩展名既可支持 properties,也可以支持 yaml/yml。 此时 spring.cloud.nacos.config.file-extension 的配置对自定义扩展配置的 Data Id 文件扩展名没有影响。
23   - shared-dataids: common.yml,redis.yml,mysql.yml
  23 + shared-dataids: common.yml,redis.yml,mysql.yml,rabbitmq.yml
24 24 #支持哪些共享配置的 Data Id 在配置变化时,应用中是否可动态刷新, 感知到最新的配置值,多个 Data Id 之间用逗号隔开。如果没有明确配置,默认情况下所有共享配置的 Data Id 都不支持动态刷新。‘
25 25 refreshable-dataids: common.yml
26 26 enabled: true
... ...