WebhookServiceImpl.java
6.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package com.diligrp.rider.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.diligrp.rider.entity.OpenApp;
import com.diligrp.rider.entity.Orders;
import com.diligrp.rider.entity.WebhookLog;
import com.diligrp.rider.mapper.OpenAppMapper;
import com.diligrp.rider.mapper.OrdersMapper;
import com.diligrp.rider.mapper.WebhookLogMapper;
import com.diligrp.rider.service.WebhookService;
import com.diligrp.rider.util.SignUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class WebhookServiceImpl implements WebhookService {
private final OpenAppMapper openAppMapper;
private final OrdersMapper ordersMapper;
private final WebhookLogMapper webhookLogMapper;
private final ObjectMapper objectMapper;
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
@Override
@Async
public void send(String event, Long bizId, String payload) {
Orders order = ordersMapper.selectById(bizId);
if (order == null) {
log.warn("Webhook 发送跳过,订单不存在 bizId={} event={}", bizId, event);
return;
}
sendOrderEvent(order, event, payload);
}
@Override
@Async
public void sendOrderEvent(Orders order, String event, String payload) {
if (order == null || order.getAppKey() == null || order.getAppKey().isBlank()) return;
OpenApp app = openAppMapper.selectOne(new LambdaQueryWrapper<OpenApp>()
.eq(OpenApp::getAppKey, order.getAppKey())
.eq(OpenApp::getStatus, 1)
.last("LIMIT 1"));
if (app == null) return;
if (!isSubscribed(app.getWebhookEvents(), event)) return;
String url = order.getCallbackUrl();
if (url == null || url.isBlank()) {
url = app.getWebhookUrl();
}
if (url == null || url.isBlank()) return;
doSend(app, url, event, order.getId(), payload, 0);
}
@Override
public void retry(Long logId) {
WebhookLog log = webhookLogMapper.selectById(logId);
if (log == null || log.getStatus() == 1) return;
retry(log);
}
@Scheduled(fixedDelay = 300_000)
public void retryFailedWebhooks() {
List<WebhookLog> logs = webhookLogMapper.selectList(new LambdaQueryWrapper<WebhookLog>()
.eq(WebhookLog::getStatus, 0)
.lt(WebhookLog::getRetryCount, 5)
.orderByAsc(WebhookLog::getCreateTime)
.last("LIMIT 20"));
for (WebhookLog log : logs) {
retry(log);
}
}
private void retry(WebhookLog webhookLog) {
OpenApp app = openAppMapper.selectById(webhookLog.getAppId());
if (app == null) return;
String url = webhookLog.getUrl();
if (url == null || url.isBlank()) {
url = app.getWebhookUrl();
}
if (url == null || url.isBlank()) return;
SendResult result = sendHttp(app, url, webhookLog.getEvent(), webhookLog.getPayload());
webhookLog.setUrl(url);
webhookLog.setResponseCode(result.responseCode());
webhookLog.setResponseBody(trimResponseBody(result.responseBody()));
webhookLog.setStatus(result.status());
webhookLog.setRetryCount((webhookLog.getRetryCount() == null ? 0 : webhookLog.getRetryCount()) + 1);
webhookLogMapper.updateById(webhookLog);
}
private void doSend(OpenApp app, String url, String event, Long bizId, String payload, int retryCount) {
SendResult result = sendHttp(app, url, event, payload);
WebhookLog webhookLog = new WebhookLog();
webhookLog.setAppId(app.getId());
webhookLog.setEvent(event);
webhookLog.setBizId(bizId);
webhookLog.setUrl(url);
webhookLog.setPayload(payload);
webhookLog.setRetryCount(retryCount);
webhookLog.setCreateTime(System.currentTimeMillis() / 1000);
webhookLog.setResponseCode(result.responseCode());
webhookLog.setResponseBody(trimResponseBody(result.responseBody()));
webhookLog.setStatus(result.status());
webhookLogMapper.insert(webhookLog);
}
private SendResult sendHttp(OpenApp app, String url, String event, String payload) {
int responseCode = 0;
String responseBody = "";
int status = 0;
try {
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
String nonce = UUID.randomUUID().toString().replace("-", "");
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Content-Type", "application/json")
.header("X-App-Key", app.getAppKey())
.header("X-Event", event)
.header("X-Timestamp", timestamp)
.header("X-Nonce", nonce)
.header("X-Sign", SignUtil.sign(app.getAppKey(), timestamp, nonce, app.getAppSecret()))
.POST(HttpRequest.BodyPublishers.ofString(payload))
.timeout(Duration.ofSeconds(10))
.build();
HttpResponse<String> response = HTTP_CLIENT.send(request,
HttpResponse.BodyHandlers.ofString());
responseCode = response.statusCode();
responseBody = response.body() == null ? "" : response.body();
if (responseCode == 200) status = 1;
} catch (Exception e) {
log.warn("Webhook 发送失败 appId={} event={} err={}", app.getId(), event, e.getMessage());
responseBody = e.getMessage() == null ? "" : e.getMessage();
}
return new SendResult(responseCode, responseBody, status);
}
private String trimResponseBody(String responseBody) {
if (responseBody == null) return "";
return responseBody.length() > 500 ? responseBody.substring(0, 500) : responseBody;
}
private record SendResult(int responseCode, String responseBody, int status) {}
/** 检查应用是否订阅了某事件 */
private boolean isSubscribed(String webhookEvents, String event) {
if (webhookEvents == null || webhookEvents.isBlank()) return false;
try {
List<String> events = objectMapper.readValue(webhookEvents,
objectMapper.getTypeFactory().constructCollectionType(List.class, String.class));
return events.contains(event) || events.contains("*");
} catch (Exception e) {
return false;
}
}
}