Commit 2dfdc025a35902e578e012b6006c8a7f3d4a005a

Authored by shaofan
1 parent 815273bb

新增 XXL-Job 分布式调度支持,完善订单自动派单、超时取消和 Webhook 重试任务逻辑

... ... @@ -92,6 +92,13 @@
92 92 <version>3.6.6</version>
93 93 </dependency>
94 94  
  95 + <!-- XXL-Job 分布式调度 -->
  96 + <dependency>
  97 + <groupId>com.xuxueli</groupId>
  98 + <artifactId>xxl-job-core</artifactId>
  99 + <version>2.4.1</version>
  100 + </dependency>
  101 +
95 102 <!-- Test -->
96 103 <dependency>
97 104 <groupId>org.springframework.boot</groupId>
... ...
src/main/java/com/diligrp/rider/config/XxlJobConfig.java 0 → 100644
  1 +package com.diligrp.rider.config;
  2 +
  3 +import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
  4 +import lombok.extern.slf4j.Slf4j;
  5 +import org.springframework.beans.factory.annotation.Value;
  6 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  7 +import org.springframework.context.annotation.Bean;
  8 +import org.springframework.context.annotation.Configuration;
  9 +
  10 +@Slf4j
  11 +@Configuration
  12 +@ConditionalOnProperty(prefix = "xxl.job", name = "enabled", havingValue = "true", matchIfMissing = true)
  13 +public class XxlJobConfig {
  14 +
  15 + @Value("${xxl.job.admin.addresses:}")
  16 + private String adminAddresses;
  17 +
  18 + @Value("${xxl.job.accessToken:default_token}")
  19 + private String accessToken;
  20 +
  21 + @Value("${xxl.job.executor.appname:rider-service-executor}")
  22 + private String appname;
  23 +
  24 + @Value("${xxl.job.executor.address:}")
  25 + private String address;
  26 +
  27 + @Value("${xxl.job.executor.ip:}")
  28 + private String ip;
  29 +
  30 + @Value("${xxl.job.executor.port:9999}")
  31 + private int port;
  32 +
  33 + @Value("${xxl.job.executor.logpath:./logs/xxl-job/jobhandler}")
  34 + private String logPath;
  35 +
  36 + @Value("${xxl.job.executor.logretentiondays:30}")
  37 + private int logRetentionDays;
  38 +
  39 + @Bean(initMethod = "start", destroyMethod = "destroy")
  40 + public XxlJobSpringExecutor xxlJobExecutor() {
  41 + log.info("XXL-Job executor 启动 adminAddresses={} appname={} port={}", adminAddresses, appname, port);
  42 + XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
  43 + executor.setAdminAddresses(adminAddresses);
  44 + executor.setAccessToken(accessToken);
  45 + executor.setAppname(appname);
  46 + executor.setAddress(address);
  47 + executor.setIp(ip);
  48 + executor.setPort(port);
  49 + executor.setLogPath(logPath);
  50 + executor.setLogRetentionDays(logRetentionDays);
  51 + return executor;
  52 + }
  53 +}
... ...
src/main/java/com/diligrp/rider/service/impl/DispatchServiceImpl.java
... ... @@ -14,10 +14,12 @@ import com.diligrp.rider.vo.DispatchRuleTemplateVO;
14 14 import com.fasterxml.jackson.databind.ObjectMapper;
15 15 import lombok.RequiredArgsConstructor;
16 16 import lombok.extern.slf4j.Slf4j;
  17 +import org.springframework.data.redis.core.StringRedisTemplate;
17 18 import org.springframework.stereotype.Service;
18 19 import org.springframework.transaction.annotation.Transactional;
19 20  
20 21 import java.math.BigDecimal;
  22 +import java.time.Duration;
21 23 import java.time.LocalDate;
22 24 import java.time.ZoneId;
23 25 import java.time.format.DateTimeFormatter;
... ... @@ -28,6 +30,9 @@ import java.util.*;
28 30 @RequiredArgsConstructor
29 31 public class DispatchServiceImpl implements DispatchService {
30 32  
  33 + private static final String DISPATCH_LOCK_PREFIX = "dispatch:lock:order:";
  34 + private static final Duration DISPATCH_LOCK_TTL = Duration.ofSeconds(10);
  35 +
31 36 private final DispatchRuleService dispatchRuleService;
32 37 private final RiderMapper riderMapper;
33 38 private final RiderLocationMapper locationMapper;
... ... @@ -37,10 +42,26 @@ public class DispatchServiceImpl implements DispatchService {
37 42 private final RiderHoldLimitService riderHoldLimitService;
38 43 private final WebhookService webhookService;
39 44 private final ObjectMapper objectMapper;
  45 + private final StringRedisTemplate redisTemplate;
40 46  
41 47 @Override
42 48 @Transactional
43 49 public Long dispatch(Orders order) {
  50 + String lockKey = DISPATCH_LOCK_PREFIX + order.getId();
  51 + Boolean locked = redisTemplate.opsForValue()
  52 + .setIfAbsent(lockKey, "1", DISPATCH_LOCK_TTL);
  53 + if (!Boolean.TRUE.equals(locked)) {
  54 + log.debug("订单 {} 派单锁被占用,跳过", order.getId());
  55 + return null;
  56 + }
  57 + try {
  58 + return doDispatch(order);
  59 + } finally {
  60 + redisTemplate.delete(lockKey);
  61 + }
  62 + }
  63 +
  64 + private Long doDispatch(Orders order) {
44 65 DispatchRuleTemplateVO rule = dispatchRuleService.getActiveRule(order.getCityId());
45 66 if (rule == null) {
46 67 log.debug("城市 {} 无生效调度规则,跳过派单", order.getCityId());
... ...
src/main/java/com/diligrp/rider/service/impl/WebhookServiceImpl.java
1 1 package com.diligrp.rider.service.impl;
2 2  
3 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4 +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
4 5 import com.fasterxml.jackson.databind.ObjectMapper;
5 6 import com.diligrp.rider.entity.OpenApp;
6 7 import com.diligrp.rider.entity.Orders;
... ... @@ -70,9 +71,21 @@ public class WebhookServiceImpl implements WebhookService {
70 71  
71 72 @Override
72 73 public void retry(Long logId) {
73   - WebhookLog log = webhookLogMapper.selectById(logId);
74   - if (log == null || log.getStatus() == 1) return;
75   - retry(log);
  74 + WebhookLog webhookLog = webhookLogMapper.selectById(logId);
  75 + if (webhookLog == null || webhookLog.getStatus() == 1) return;
  76 +
  77 + Integer prevRetryCount = webhookLog.getRetryCount() == null ? 0 : webhookLog.getRetryCount();
  78 + int updated = webhookLogMapper.update(null, new LambdaUpdateWrapper<WebhookLog>()
  79 + .eq(WebhookLog::getId, logId)
  80 + .eq(WebhookLog::getStatus, 0)
  81 + .eq(WebhookLog::getRetryCount, prevRetryCount)
  82 + .setSql("retry_count = retry_count + 1"));
  83 + if (updated == 0) {
  84 + log.debug("Webhook 重试抢占失败 logId={}(可能被其他实例处理或状态已变)", logId);
  85 + return;
  86 + }
  87 + webhookLog.setRetryCount(prevRetryCount + 1);
  88 + retry(webhookLog);
76 89 }
77 90  
78 91 private void retry(WebhookLog webhookLog) {
... ... @@ -84,12 +97,13 @@ public class WebhookServiceImpl implements WebhookService {
84 97 }
85 98 if (url == null || url.isBlank()) return;
86 99 SendResult result = sendHttp(app, url, webhookLog.getEvent(), webhookLog.getPayload());
87   - webhookLog.setUrl(url);
88   - webhookLog.setResponseCode(result.responseCode());
89   - webhookLog.setResponseBody(trimResponseBody(result.responseBody()));
90   - webhookLog.setStatus(result.status());
91   - webhookLog.setRetryCount((webhookLog.getRetryCount() == null ? 0 : webhookLog.getRetryCount()) + 1);
92   - webhookLogMapper.updateById(webhookLog);
  100 + // retry_count 已在 retry(Long) 入口的 CAS 中递增,这里只更新发送结果字段
  101 + webhookLogMapper.update(null, new LambdaUpdateWrapper<WebhookLog>()
  102 + .eq(WebhookLog::getId, webhookLog.getId())
  103 + .set(WebhookLog::getUrl, url)
  104 + .set(WebhookLog::getResponseCode, result.responseCode())
  105 + .set(WebhookLog::getResponseBody, trimResponseBody(result.responseBody()))
  106 + .set(WebhookLog::getStatus, result.status()));
93 107 }
94 108  
95 109 private void doSend(OpenApp app, String url, String event, Long bizId, String payload, int retryCount) {
... ...
src/main/java/com/diligrp/rider/task/DispatchScheduleTask.java
... ... @@ -8,17 +8,15 @@ import com.diligrp.rider.mapper.OrdersMapper;
8 8 import com.diligrp.rider.service.DispatchRuleService;
9 9 import com.diligrp.rider.service.DispatchService;
10 10 import com.diligrp.rider.vo.DispatchRuleTemplateVO;
  11 +import com.xxl.job.core.handler.annotation.XxlJob;
11 12 import lombok.RequiredArgsConstructor;
12 13 import lombok.extern.slf4j.Slf4j;
13   -import org.springframework.scheduling.annotation.EnableScheduling;
14   -import org.springframework.scheduling.annotation.Scheduled;
15 14 import org.springframework.stereotype.Component;
16 15  
17 16 import java.util.List;
18 17  
19 18 @Slf4j
20 19 @Component
21   -@EnableScheduling
22 20 @RequiredArgsConstructor
23 21 public class DispatchScheduleTask {
24 22  
... ... @@ -29,9 +27,8 @@ public class DispatchScheduleTask {
29 27  
30 28 /**
31 29 * 抢单超时后自动派单
32   - * 每3秒执行一次
33 30 */
34   - @Scheduled(fixedDelay = 3_000)
  31 + @XxlJob("autoDispatchTimeoutOrderHandler")
35 32 public void autoDispatchTimeoutOrders() {
36 33 try {
37 34 List<City> cities = cityMapper.selectList(new LambdaQueryWrapper<City>()
... ... @@ -69,6 +66,7 @@ public class DispatchScheduleTask {
69 66 }
70 67 } catch (Exception e) {
71 68 log.error("自动派单任务异常", e);
  69 + throw new RuntimeException(e);
72 70 }
73 71 }
74 72 }
... ...
src/main/java/com/diligrp/rider/task/OrderScheduleTask.java
... ... @@ -7,10 +7,9 @@ import com.diligrp.rider.entity.Orders;
7 7 import com.diligrp.rider.mapper.OrdersMapper;
8 8 import com.diligrp.rider.service.WebhookService;
9 9 import com.diligrp.rider.service.AdminMessageService;
  10 +import com.xxl.job.core.handler.annotation.XxlJob;
10 11 import lombok.RequiredArgsConstructor;
11 12 import lombok.extern.slf4j.Slf4j;
12   -import org.springframework.scheduling.annotation.EnableScheduling;
13   -import org.springframework.scheduling.annotation.Scheduled;
14 13 import org.springframework.stereotype.Component;
15 14  
16 15 import java.util.HashMap;
... ... @@ -18,12 +17,10 @@ import java.util.List;
18 17 import java.util.Map;
19 18  
20 19 /**
21   - * 订单定时任务
22   - * OrderhandleCron(每3秒执行)
  20 + * 订单定时任务(XXL-Job handler)
23 21 */
24 22 @Slf4j
25 23 @Component
26   -@EnableScheduling
27 24 @RequiredArgsConstructor
28 25 public class OrderScheduleTask {
29 26  
... ... @@ -34,10 +31,8 @@ public class OrderScheduleTask {
34 31  
35 32 /**
36 33 * 超时未接单订单自动取消(30分钟)
37   - * Orders::cancel()
38   - * 每分钟执行一次
39 34 */
40   - @Scheduled(fixedDelay = 60_000)
  35 + @XxlJob("autoCancelTimeoutOrderHandler")
41 36 public void autoCancelTimeout() {
42 37 try {
43 38 long expireTime = System.currentTimeMillis() / 1000 - 30 * 60;
... ... @@ -70,14 +65,14 @@ public class OrderScheduleTask {
70 65 }
71 66 } catch (Exception e) {
72 67 log.error("超时取消任务异常", e);
  68 + throw new RuntimeException(e);
73 69 }
74 70 }
75 71  
76 72 /**
77 73 * 订单超时提醒(配送中的订单,距离预计送达时间还有5分钟)
78   - * 每分钟执行一次
79 74 */
80   - @Scheduled(fixedDelay = 60_000)
  75 + @XxlJob("orderTimeoutReminderHandler")
81 76 public void timeoutReminder() {
82 77 try {
83 78 long now = System.currentTimeMillis() / 1000;
... ... @@ -107,7 +102,8 @@ public class OrderScheduleTask {
107 102 }
108 103 }
109 104 } catch (Exception e) {
110   - log.error("超时取消任务异常", e);
  105 + log.error("订单超时提醒任务异常", e);
  106 + throw new RuntimeException(e);
111 107 }
112 108 }
113 109  
... ...
src/main/java/com/diligrp/rider/task/WebhookRetryTask.java
... ... @@ -4,9 +4,9 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4 4 import com.diligrp.rider.entity.WebhookLog;
5 5 import com.diligrp.rider.mapper.WebhookLogMapper;
6 6 import com.diligrp.rider.service.WebhookService;
  7 +import com.xxl.job.core.handler.annotation.XxlJob;
7 8 import lombok.RequiredArgsConstructor;
8 9 import lombok.extern.slf4j.Slf4j;
9   -import org.springframework.scheduling.annotation.Scheduled;
10 10 import org.springframework.stereotype.Component;
11 11  
12 12 import java.util.List;
... ... @@ -19,19 +19,24 @@ public class WebhookRetryTask {
19 19 private final WebhookLogMapper webhookLogMapper;
20 20 private final WebhookService webhookService;
21 21  
22   - @Scheduled(fixedDelay = 300_000)
  22 + @XxlJob("webhookRetryHandler")
23 23 public void retryFailedWebhooks() {
24   - List<WebhookLog> logs = webhookLogMapper.selectList(new LambdaQueryWrapper<WebhookLog>()
25   - .eq(WebhookLog::getStatus, 0)
26   - .lt(WebhookLog::getRetryCount, 5)
27   - .orderByAsc(WebhookLog::getCreateTime)
28   - .last("LIMIT 20"));
29   - for (WebhookLog webhookLog : logs) {
30   - try {
31   - webhookService.retry(webhookLog.getId());
32   - } catch (Exception e) {
33   - log.warn("Webhook 自动重试失败 logId={}", webhookLog.getId(), e);
  24 + try {
  25 + List<WebhookLog> logs = webhookLogMapper.selectList(new LambdaQueryWrapper<WebhookLog>()
  26 + .eq(WebhookLog::getStatus, 0)
  27 + .lt(WebhookLog::getRetryCount, 5)
  28 + .orderByAsc(WebhookLog::getCreateTime)
  29 + .last("LIMIT 20"));
  30 + for (WebhookLog webhookLog : logs) {
  31 + try {
  32 + webhookService.retry(webhookLog.getId());
  33 + } catch (Exception e) {
  34 + log.warn("Webhook 自动重试失败 logId={}", webhookLog.getId(), e);
  35 + }
34 36 }
  37 + } catch (Exception e) {
  38 + log.error("Webhook 重试任务异常", e);
  39 + throw new RuntimeException(e);
35 40 }
36 41 }
37 42 }
... ...
src/main/resources/application.yml
... ... @@ -2,6 +2,8 @@ server:
2 2 port: 8080
3 3  
4 4 spring:
  5 + application:
  6 + name: rider-service
5 7 datasource:
6 8 driver-class-name: com.mysql.cj.jdbc.Driver
7 9 url: jdbc:mysql://mysql.diligrp.com:3306/dili_rider?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
... ... @@ -45,6 +47,20 @@ amap:
45 47 connect-timeout-millis: 3000
46 48 read-timeout-millis: 3000
47 49  
  50 +xxl:
  51 + job:
  52 + enabled: false # 本地开发可用 -Dxxl.job.enabled=false 关闭 executor 自启
  53 + accessToken: default_token # 调度中心与执行器通信 token,生产环境务必修改
  54 + admin:
  55 + addresses: http://localhost:8081/xxl-job-admin # 调度中心地址,多个用逗号分隔
  56 + executor:
  57 + appname: rider-service-executor # 执行器 AppName,调度中心需要匹配
  58 + address: # 留空时由 ip+port 自动拼接
  59 + ip: # 留空时自动获取本机 IP(多网卡需显式指定)
  60 + port: 9999 # 执行器内嵌服务端口
  61 + logpath: ./logs/xxl-job/jobhandler # 任务日志本地落盘路径
  62 + logretentiondays: 30 # 日志保留天数
  63 +
48 64 logging:
49 65 level:
50 66 com.diligrp.rider: debug
... ...