WebhookServiceImpl.java
4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.diligrp.rider.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.diligrp.rider.entity.OpenApp;
import com.diligrp.rider.entity.WebhookLog;
import com.diligrp.rider.mapper.OpenAppMapper;
import com.diligrp.rider.mapper.WebhookLogMapper;
import com.diligrp.rider.service.WebhookService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class WebhookServiceImpl implements WebhookService {
private final OpenAppMapper openAppMapper;
private final WebhookLogMapper webhookLogMapper;
private final ObjectMapper objectMapper;
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
@Override
@Async
public void send(String event, Long bizId, String payload) {
// 查找订阅该事件的所有应用
List<OpenApp> apps = openAppMapper.selectList(
new LambdaQueryWrapper<OpenApp>().eq(OpenApp::getStatus, 1));
for (OpenApp app : apps) {
if (app.getWebhookUrl() == null || app.getWebhookUrl().isBlank()) continue;
if (!isSubscribed(app.getWebhookEvents(), event)) continue;
doSend(app, event, bizId, payload, 0);
}
}
@Override
public void retry(Long logId) {
WebhookLog log = webhookLogMapper.selectById(logId);
if (log == null || log.getStatus() == 1) return;
OpenApp app = openAppMapper.selectById(log.getAppId());
if (app == null) return;
doSend(app, log.getEvent(), log.getBizId(), log.getPayload(), log.getRetryCount() + 1);
}
private void doSend(OpenApp app, String event, Long bizId, String payload, int retryCount) {
WebhookLog webhookLog = new WebhookLog();
webhookLog.setAppId(app.getId());
webhookLog.setEvent(event);
webhookLog.setBizId(bizId);
webhookLog.setUrl(app.getWebhookUrl());
webhookLog.setPayload(payload);
webhookLog.setRetryCount(retryCount);
webhookLog.setCreateTime(System.currentTimeMillis() / 1000);
int responseCode = 0;
String responseBody = "";
int status = 0;
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(app.getWebhookUrl()))
.header("Content-Type", "application/json")
.header("X-App-Key", app.getAppKey())
.header("X-Event", event)
.header("X-Timestamp", String.valueOf(System.currentTimeMillis() / 1000))
.POST(HttpRequest.BodyPublishers.ofString(payload))
.timeout(Duration.ofSeconds(10))
.build();
HttpResponse<String> response = HTTP_CLIENT.send(request,
HttpResponse.BodyHandlers.ofString());
responseCode = response.statusCode();
responseBody = response.body();
if (responseCode == 200) status = 1;
} catch (Exception e) {
log.warn("Webhook 发送失败 appId={} event={} err={}", app.getId(), event, e.getMessage());
responseBody = e.getMessage();
}
webhookLog.setResponseCode(responseCode);
webhookLog.setResponseBody(responseBody.length() > 500 ? responseBody.substring(0, 500) : responseBody);
webhookLog.setStatus(status);
webhookLogMapper.insert(webhookLog);
}
/** 检查应用是否订阅了某事件 */
private boolean isSubscribed(String webhookEvents, String event) {
if (webhookEvents == null || webhookEvents.isBlank()) return false;
try {
List<String> events = objectMapper.readValue(webhookEvents,
objectMapper.getTypeFactory().constructCollectionType(List.class, String.class));
return events.contains(event) || events.contains("*");
} catch (Exception e) {
return false;
}
}
}