Commit 1a1b19a7c0156741498b4d4f40cb0c6d06057c86

Authored by 蒋勇
1 parent dff9b9d0

feat(location): 添加骑手位置实时推送功能

- 新增 AdminLocationController 提供管理端位置接口
- 创建 AdminRiderLocationVO 和 LocationPushMessage 数据对象
- 实现 LocationPushService 和 LocationSessionRegistry 推送服务
- 集成 WebSocket 配置支持实时位置更新
- 在 RiderLocationService 中添加按城市查询活跃骑手功能
- 实现位置更新时自动推送给对应城市的 WebSocket 客户端
- 添加 WebSocket 握手拦截器验证管理员身份和权限
@@ -28,6 +28,10 @@ @@ -28,6 +28,10 @@
28 <groupId>org.springframework.boot</groupId> 28 <groupId>org.springframework.boot</groupId>
29 <artifactId>spring-boot-starter-web</artifactId> 29 <artifactId>spring-boot-starter-web</artifactId>
30 </dependency> 30 </dependency>
  31 + <dependency>
  32 + <groupId>org.springframework.boot</groupId>
  33 + <artifactId>spring-boot-starter-websocket</artifactId>
  34 + </dependency>
31 35
32 <!-- MyBatis-Plus (Spring Boot 3 专用 starter) --> 36 <!-- MyBatis-Plus (Spring Boot 3 专用 starter) -->
33 <dependency> 37 <dependency>
src/main/java/com/diligrp/rider/config/LocationWebSocketConfig.java 0 → 100644
  1 +package com.diligrp.rider.config;
  2 +
  3 +import com.diligrp.rider.websocket.LocationWebSocketHandshakeInterceptor;
  4 +import com.diligrp.rider.websocket.LocationWebSocketHandler;
  5 +import lombok.RequiredArgsConstructor;
  6 +import org.springframework.context.annotation.Configuration;
  7 +import org.springframework.web.socket.config.annotation.EnableWebSocket;
  8 +import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  9 +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  10 +
  11 +@Configuration
  12 +@EnableWebSocket
  13 +@RequiredArgsConstructor
  14 +public class LocationWebSocketConfig implements WebSocketConfigurer {
  15 +
  16 + private final LocationWebSocketHandler locationWebSocketHandler;
  17 + private final LocationWebSocketHandshakeInterceptor locationWebSocketHandshakeInterceptor;
  18 +
  19 + @Override
  20 + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  21 + registry.addHandler(locationWebSocketHandler, "/ws/location")
  22 + .addInterceptors(locationWebSocketHandshakeInterceptor)
  23 + .setAllowedOriginPatterns("*");
  24 + }
  25 +}
src/main/java/com/diligrp/rider/controller/AdminLocationController.java 0 → 100644
  1 +package com.diligrp.rider.controller;
  2 +
  3 +import com.diligrp.rider.common.exception.BizException;
  4 +import com.diligrp.rider.common.result.Result;
  5 +import com.diligrp.rider.service.RiderLocationService;
  6 +import com.diligrp.rider.vo.AdminRiderLocationVO;
  7 +import jakarta.servlet.http.HttpServletRequest;
  8 +import lombok.RequiredArgsConstructor;
  9 +import org.springframework.web.bind.annotation.GetMapping;
  10 +import org.springframework.web.bind.annotation.RequestMapping;
  11 +import org.springframework.web.bind.annotation.RequestParam;
  12 +import org.springframework.web.bind.annotation.RestController;
  13 +
  14 +import java.util.List;
  15 +
  16 +@RestController
  17 +@RequestMapping("/api/admin/location")
  18 +@RequiredArgsConstructor
  19 +public class AdminLocationController {
  20 +
  21 + private final RiderLocationService riderLocationService;
  22 +
  23 + @GetMapping("/active")
  24 + public Result<List<AdminRiderLocationVO>> active(@RequestParam(required = false) Long cityId,
  25 + HttpServletRequest request) {
  26 + String role = (String) request.getAttribute("role");
  27 + Long resolvedCityId = "substation".equals(role)
  28 + ? (Long) request.getAttribute("cityId")
  29 + : cityId;
  30 + if (resolvedCityId == null || resolvedCityId < 1) {
  31 + throw new BizException(400, "请选择城市");
  32 + }
  33 + return Result.success(riderLocationService.listActiveByCity(resolvedCityId));
  34 + }
  35 +}
src/main/java/com/diligrp/rider/service/RiderLocationService.java
1 package com.diligrp.rider.service; 1 package com.diligrp.rider.service;
2 2
3 import com.diligrp.rider.dto.LocationDTO; 3 import com.diligrp.rider.dto.LocationDTO;
  4 +import com.diligrp.rider.vo.AdminRiderLocationVO;
4 import com.diligrp.rider.vo.NearbyRiderVO; 5 import com.diligrp.rider.vo.NearbyRiderVO;
5 6
6 import java.math.BigDecimal; 7 import java.math.BigDecimal;
@@ -13,6 +14,8 @@ public interface RiderLocationService { @@ -13,6 +14,8 @@ public interface RiderLocationService {
13 /** 获取骑手位置 */ 14 /** 获取骑手位置 */
14 LocationDTO getLocation(Long riderId); 15 LocationDTO getLocation(Long riderId);
15 16
  17 + List<AdminRiderLocationVO> listActiveByCity(Long cityId);
  18 +
16 /** 19 /**
17 * 获取附近在线骑手列表 20 * 获取附近在线骑手列表
18 * 21 *
src/main/java/com/diligrp/rider/service/impl/RiderLocationServiceImpl.java
@@ -10,7 +10,9 @@ import com.diligrp.rider.mapper.RiderMapper; @@ -10,7 +10,9 @@ import com.diligrp.rider.mapper.RiderMapper;
10 import com.diligrp.rider.service.CityService; 10 import com.diligrp.rider.service.CityService;
11 import com.diligrp.rider.service.RiderLocationService; 11 import com.diligrp.rider.service.RiderLocationService;
12 import com.diligrp.rider.util.GeoUtil; 12 import com.diligrp.rider.util.GeoUtil;
  13 +import com.diligrp.rider.vo.AdminRiderLocationVO;
13 import com.diligrp.rider.vo.NearbyRiderVO; 14 import com.diligrp.rider.vo.NearbyRiderVO;
  15 +import com.diligrp.rider.websocket.LocationPushService;
14 import lombok.RequiredArgsConstructor; 16 import lombok.RequiredArgsConstructor;
15 import lombok.extern.slf4j.Slf4j; 17 import lombok.extern.slf4j.Slf4j;
16 import org.springframework.stereotype.Service; 18 import org.springframework.stereotype.Service;
@@ -29,6 +31,7 @@ public class RiderLocationServiceImpl implements RiderLocationService { @@ -29,6 +31,7 @@ public class RiderLocationServiceImpl implements RiderLocationService {
29 private final RiderLocationMapper locationMapper; 31 private final RiderLocationMapper locationMapper;
30 private final RiderMapper riderMapper; 32 private final RiderMapper riderMapper;
31 private final CityService cityService; 33 private final CityService cityService;
  34 + private final LocationPushService locationPushService;
32 35
33 /** 36 /**
34 * 按骑手维度更新最新位置。 37 * 按骑手维度更新最新位置。
@@ -38,11 +41,22 @@ public class RiderLocationServiceImpl implements RiderLocationService { @@ -38,11 +41,22 @@ public class RiderLocationServiceImpl implements RiderLocationService {
38 long now = System.currentTimeMillis() / 1000; 41 long now = System.currentTimeMillis() / 1000;
39 log.debug("更新骑手位置,参数 riderId={} lng={} lat={} updateTime={}", 42 log.debug("更新骑手位置,参数 riderId={} lng={} lat={} updateTime={}",
40 riderId, dto.getLng(), dto.getLat(), now); 43 riderId, dto.getLng(), dto.getLat(), now);
  44 + Rider rider = riderMapper.selectById(riderId);
41 locationMapper.upsertLocation( 45 locationMapper.upsertLocation(
42 riderId, 46 riderId,
43 dto.getLng().stripTrailingZeros().toPlainString(), 47 dto.getLng().stripTrailingZeros().toPlainString(),
44 dto.getLat().stripTrailingZeros().toPlainString(), 48 dto.getLat().stripTrailingZeros().toPlainString(),
45 now); 49 now);
  50 + if (rider != null && rider.getCityId() != null) {
  51 + AdminRiderLocationVO location = new AdminRiderLocationVO();
  52 + location.setRiderId(riderId);
  53 + location.setRiderName(rider.getUserNickname());
  54 + location.setCityId(rider.getCityId());
  55 + location.setLng(dto.getLng());
  56 + location.setLat(dto.getLat());
  57 + location.setUpdateTime(now);
  58 + locationPushService.pushRiderLocation(location);
  59 + }
46 } 60 }
47 61
48 /** 62 /**
@@ -63,6 +77,54 @@ public class RiderLocationServiceImpl implements RiderLocationService { @@ -63,6 +77,54 @@ public class RiderLocationServiceImpl implements RiderLocationService {
63 return dto; 77 return dto;
64 } 78 }
65 79
  80 + @Override
  81 + public List<AdminRiderLocationVO> listActiveByCity(Long cityId) {
  82 + List<AdminRiderLocationVO> result = new ArrayList<>();
  83 + if (cityId == null || cityId < 1) {
  84 + return result;
  85 + }
  86 +
  87 + List<Rider> riders = riderMapper.selectList(new LambdaQueryWrapper<Rider>()
  88 + .eq(Rider::getCityId, cityId)
  89 + .eq(Rider::getIsRest, 0)
  90 + .eq(Rider::getUserStatus, 1));
  91 + if (riders.isEmpty()) {
  92 + return result;
  93 + }
  94 +
  95 + List<Long> riderIds = riders.stream().map(Rider::getId).toList();
  96 + long minUpdateTime = System.currentTimeMillis() / 1000 - ACTIVE_WINDOW_SECONDS;
  97 + List<RiderLocation> locations = locationMapper.selectList(
  98 + new LambdaQueryWrapper<RiderLocation>()
  99 + .in(RiderLocation::getUid, riderIds)
  100 + .ge(RiderLocation::getUpdateTime, minUpdateTime));
  101 + if (locations.isEmpty()) {
  102 + return result;
  103 + }
  104 +
  105 + java.util.Map<Long, Rider> riderMap = riders.stream()
  106 + .collect(java.util.stream.Collectors.toMap(Rider::getId, rider -> rider));
  107 + for (RiderLocation location : locations) {
  108 + Rider rider = riderMap.get(location.getUid());
  109 + if (rider == null) {
  110 + continue;
  111 + }
  112 + try {
  113 + AdminRiderLocationVO vo = new AdminRiderLocationVO();
  114 + vo.setRiderId(rider.getId());
  115 + vo.setRiderName(rider.getUserNickname());
  116 + vo.setCityId(rider.getCityId());
  117 + vo.setLng(new BigDecimal(location.getLng()));
  118 + vo.setLat(new BigDecimal(location.getLat()));
  119 + vo.setUpdateTime(location.getUpdateTime());
  120 + result.add(vo);
  121 + } catch (Exception ignored) {
  122 + }
  123 + }
  124 + log.debug("查询城市活跃骑手位置完成,cityId={} count={}", cityId, result.size());
  125 + return result;
  126 + }
  127 +
66 /** 128 /**
67 * 查询城市范围内最近活跃的附近骑手。 129 * 查询城市范围内最近活跃的附近骑手。
68 */ 130 */
src/main/java/com/diligrp/rider/vo/AdminRiderLocationVO.java 0 → 100644
  1 +package com.diligrp.rider.vo;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.math.BigDecimal;
  6 +
  7 +@Data
  8 +public class AdminRiderLocationVO {
  9 + private Long riderId;
  10 + private String riderName;
  11 + private BigDecimal lng;
  12 + private BigDecimal lat;
  13 + private Long cityId;
  14 + private Long updateTime;
  15 +}
src/main/java/com/diligrp/rider/websocket/LocationPushMessage.java 0 → 100644
  1 +package com.diligrp.rider.websocket;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.math.BigDecimal;
  6 +
  7 +@Data
  8 +public class LocationPushMessage {
  9 + private String type;
  10 + private Long riderId;
  11 + private String riderName;
  12 + private Long cityId;
  13 + private BigDecimal lng;
  14 + private BigDecimal lat;
  15 + private Long updateTime;
  16 +}
src/main/java/com/diligrp/rider/websocket/LocationPushService.java 0 → 100644
  1 +package com.diligrp.rider.websocket;
  2 +
  3 +import com.fasterxml.jackson.databind.ObjectMapper;
  4 +import com.diligrp.rider.vo.AdminRiderLocationVO;
  5 +import lombok.RequiredArgsConstructor;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.stereotype.Component;
  8 +import org.springframework.web.socket.TextMessage;
  9 +import org.springframework.web.socket.WebSocketSession;
  10 +
  11 +@Component
  12 +@RequiredArgsConstructor
  13 +@Slf4j
  14 +public class LocationPushService {
  15 +
  16 + private final LocationSessionRegistry sessionRegistry;
  17 + private final ObjectMapper objectMapper;
  18 +
  19 + public void pushRiderLocation(AdminRiderLocationVO location) {
  20 + if (location == null || location.getCityId() == null) {
  21 + return;
  22 + }
  23 + LocationPushMessage message = new LocationPushMessage();
  24 + message.setType("rider_location_update");
  25 + message.setRiderId(location.getRiderId());
  26 + message.setRiderName(location.getRiderName());
  27 + message.setCityId(location.getCityId());
  28 + message.setLng(location.getLng());
  29 + message.setLat(location.getLat());
  30 + message.setUpdateTime(location.getUpdateTime());
  31 + broadcast(location.getCityId(), message);
  32 + }
  33 +
  34 + public void pushConnected(Long cityId) {
  35 + if (cityId == null) {
  36 + return;
  37 + }
  38 + LocationPushMessage message = new LocationPushMessage();
  39 + message.setType("connected");
  40 + message.setCityId(cityId);
  41 + broadcast(cityId, message);
  42 + }
  43 +
  44 + private void broadcast(Long cityId, LocationPushMessage message) {
  45 + try {
  46 + String payload = objectMapper.writeValueAsString(message);
  47 + for (WebSocketSession session : sessionRegistry.getSessions(cityId)) {
  48 + if (!session.isOpen()) {
  49 + sessionRegistry.unregister(session);
  50 + continue;
  51 + }
  52 + synchronized (session) {
  53 + session.sendMessage(new TextMessage(payload));
  54 + }
  55 + }
  56 + log.debug("骑手位置推送完成,cityId={} type={}", cityId, message.getType());
  57 + } catch (Exception e) {
  58 + log.warn("骑手位置推送失败,cityId={} type={}", cityId, message.getType(), e);
  59 + }
  60 + }
  61 +}
src/main/java/com/diligrp/rider/websocket/LocationSessionRegistry.java 0 → 100644
  1 +package com.diligrp.rider.websocket;
  2 +
  3 +import org.springframework.stereotype.Component;
  4 +import org.springframework.web.socket.WebSocketSession;
  5 +
  6 +import java.util.Collections;
  7 +import java.util.Set;
  8 +import java.util.concurrent.ConcurrentHashMap;
  9 +
  10 +@Component
  11 +public class LocationSessionRegistry {
  12 +
  13 + private final ConcurrentHashMap<Long, Set<WebSocketSession>> citySessions = new ConcurrentHashMap<>();
  14 + private final ConcurrentHashMap<String, Long> sessionCities = new ConcurrentHashMap<>();
  15 +
  16 + public void register(Long cityId, WebSocketSession session) {
  17 + citySessions.computeIfAbsent(cityId, key -> ConcurrentHashMap.newKeySet()).add(session);
  18 + sessionCities.put(session.getId(), cityId);
  19 + }
  20 +
  21 + public void unregister(WebSocketSession session) {
  22 + Long cityId = sessionCities.remove(session.getId());
  23 + if (cityId == null) {
  24 + return;
  25 + }
  26 + Set<WebSocketSession> sessions = citySessions.get(cityId);
  27 + if (sessions == null) {
  28 + return;
  29 + }
  30 + sessions.remove(session);
  31 + if (sessions.isEmpty()) {
  32 + citySessions.remove(cityId, sessions);
  33 + }
  34 + }
  35 +
  36 + public Set<WebSocketSession> getSessions(Long cityId) {
  37 + return citySessions.getOrDefault(cityId, Collections.emptySet());
  38 + }
  39 +}
src/main/java/com/diligrp/rider/websocket/LocationWebSocketHandler.java 0 → 100644
  1 +package com.diligrp.rider.websocket;
  2 +
  3 +import lombok.RequiredArgsConstructor;
  4 +import lombok.extern.slf4j.Slf4j;
  5 +import org.springframework.stereotype.Component;
  6 +import org.springframework.web.socket.CloseStatus;
  7 +import org.springframework.web.socket.TextMessage;
  8 +import org.springframework.web.socket.WebSocketSession;
  9 +import org.springframework.web.socket.handler.TextWebSocketHandler;
  10 +
  11 +@Component
  12 +@RequiredArgsConstructor
  13 +@Slf4j
  14 +public class LocationWebSocketHandler extends TextWebSocketHandler {
  15 +
  16 + private final LocationSessionRegistry sessionRegistry;
  17 +
  18 + @Override
  19 + public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  20 + Object cityIdObj = session.getAttributes().get("cityId");
  21 + if (!(cityIdObj instanceof Long cityId)) {
  22 + session.close(CloseStatus.NOT_ACCEPTABLE.withReason("缺少城市信息"));
  23 + return;
  24 + }
  25 + sessionRegistry.register(cityId, session);
  26 + log.debug("位置推送连接建立,sessionId={} cityId={}", session.getId(), cityId);
  27 + }
  28 +
  29 + @Override
  30 + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
  31 + if ("ping".equalsIgnoreCase(message.getPayload())) {
  32 + session.sendMessage(new TextMessage("pong"));
  33 + }
  34 + }
  35 +
  36 + @Override
  37 + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
  38 + sessionRegistry.unregister(session);
  39 + log.debug("位置推送连接关闭,sessionId={} code={}", session.getId(), status.getCode());
  40 + }
  41 +
  42 + @Override
  43 + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
  44 + sessionRegistry.unregister(session);
  45 + if (session.isOpen()) {
  46 + session.close(CloseStatus.SERVER_ERROR);
  47 + }
  48 + log.warn("位置推送连接异常,sessionId={}", session.getId(), exception);
  49 + }
  50 +}
src/main/java/com/diligrp/rider/websocket/LocationWebSocketHandshakeInterceptor.java 0 → 100644
  1 +package com.diligrp.rider.websocket;
  2 +
  3 +import com.diligrp.rider.common.exception.BizException;
  4 +import com.diligrp.rider.config.JwtUtil;
  5 +import com.diligrp.rider.entity.Substation;
  6 +import com.diligrp.rider.mapper.SubstationMapper;
  7 +import io.jsonwebtoken.Claims;
  8 +import lombok.RequiredArgsConstructor;
  9 +import org.springframework.http.server.ServerHttpRequest;
  10 +import org.springframework.http.server.ServerHttpResponse;
  11 +import org.springframework.http.server.ServletServerHttpRequest;
  12 +import org.springframework.stereotype.Component;
  13 +import org.springframework.util.StringUtils;
  14 +import org.springframework.web.socket.WebSocketHandler;
  15 +import org.springframework.web.socket.server.HandshakeInterceptor;
  16 +
  17 +import java.util.Map;
  18 +
  19 +@Component
  20 +@RequiredArgsConstructor
  21 +public class LocationWebSocketHandshakeInterceptor implements HandshakeInterceptor {
  22 +
  23 + private final JwtUtil jwtUtil;
  24 + private final SubstationMapper substationMapper;
  25 +
  26 + @Override
  27 + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
  28 + WebSocketHandler wsHandler, Map<String, Object> attributes) {
  29 + if (!(request instanceof ServletServerHttpRequest servletRequest)) {
  30 + return false;
  31 + }
  32 + String token = servletRequest.getServletRequest().getParameter("token");
  33 + if (!StringUtils.hasText(token)) {
  34 + token = request.getHeaders().getFirst("Authorization");
  35 + }
  36 + if (!StringUtils.hasText(token)) {
  37 + throw new BizException(700, "请先登录");
  38 + }
  39 + if (token.startsWith("Bearer ")) {
  40 + token = token.substring(7);
  41 + }
  42 +
  43 + Claims claims = jwtUtil.getAdminClaims(token);
  44 + Object adminIdObj = claims.get("adminId");
  45 + String role = claims.get("role", String.class);
  46 + if (!(adminIdObj instanceof Number) || !StringUtils.hasText(role)) {
  47 + throw new BizException(700, "登录状态失效,请重新登录");
  48 + }
  49 +
  50 + Long cityId;
  51 + Long adminId = ((Number) adminIdObj).longValue();
  52 + if ("substation".equals(role)) {
  53 + Substation substation = substationMapper.selectById(adminId);
  54 + if (substation == null || substation.getCityId() == null) {
  55 + throw new BizException(700, "分站信息不存在");
  56 + }
  57 + cityId = substation.getCityId();
  58 + } else if ("admin".equals(role)) {
  59 + String cityIdText = servletRequest.getServletRequest().getParameter("cityId");
  60 + if (!StringUtils.hasText(cityIdText)) {
  61 + throw new BizException(400, "管理员连接WebSocket时必须传cityId");
  62 + }
  63 + cityId = Long.parseLong(cityIdText);
  64 + } else {
  65 + throw new BizException(403, "当前角色无权订阅位置推送");
  66 + }
  67 +
  68 + attributes.put("adminId", adminId);
  69 + attributes.put("role", role);
  70 + attributes.put("cityId", cityId);
  71 + return true;
  72 + }
  73 +
  74 + @Override
  75 + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
  76 + WebSocketHandler wsHandler, Exception exception) {
  77 + }
  78 +}