From b982eb80d211bd7db0f0050ac51c770583ad99c4 Mon Sep 17 00:00:00 2001 From: yangyang Date: Wed, 1 Apr 2026 17:56:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=85=AC=E5=85=B1webscoket=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/sdm/common/common/WsMessage.java | 25 ++++ .../com/sdm/common/common/WsSceneEnum.java | 22 ++++ .../system/WsPushToolFeignClientImpl.java | 47 +++++++ .../inter/system/IWsPushToolFeignClient.java | 18 +++ .../controller/MockWebScoketController.java | 52 ++++++++ system/pom.xml | 5 + .../sdm/system/config/WebSocketConfig.java | 14 ++ .../controller/WebSocketController.java | 79 ++++++++++++ .../system/controller/WebSocketServer.java | 122 ++++++++++++++++++ .../sdm/system/service/impl/WsPushTool.java | 65 ++++++++++ 10 files changed, 449 insertions(+) create mode 100644 common/src/main/java/com/sdm/common/common/WsMessage.java create mode 100644 common/src/main/java/com/sdm/common/common/WsSceneEnum.java create mode 100644 common/src/main/java/com/sdm/common/feign/impl/system/WsPushToolFeignClientImpl.java create mode 100644 common/src/main/java/com/sdm/common/feign/inter/system/IWsPushToolFeignClient.java create mode 100644 data/src/main/java/com/sdm/data/controller/MockWebScoketController.java create mode 100644 system/src/main/java/com/sdm/system/config/WebSocketConfig.java create mode 100644 system/src/main/java/com/sdm/system/controller/WebSocketController.java create mode 100644 system/src/main/java/com/sdm/system/controller/WebSocketServer.java create mode 100644 system/src/main/java/com/sdm/system/service/impl/WsPushTool.java diff --git a/common/src/main/java/com/sdm/common/common/WsMessage.java b/common/src/main/java/com/sdm/common/common/WsMessage.java new file mode 100644 index 00000000..e7a26915 --- /dev/null +++ b/common/src/main/java/com/sdm/common/common/WsMessage.java @@ -0,0 +1,25 @@ +package com.sdm.common.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +// 公共websocke消息实体类 +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WsMessage { + + // 场景:定义在 WsSceneEnum 维护,前端根据这个字段处理业务 + private String scene; + + // 单一推送:接收者的用户id; 群发推送:发送人的推送id + private Long userId; + + // 推送的时间戳 + private Long timestamp; + + // 推送的数据 + private T data; + +} \ No newline at end of file diff --git a/common/src/main/java/com/sdm/common/common/WsSceneEnum.java b/common/src/main/java/com/sdm/common/common/WsSceneEnum.java new file mode 100644 index 00000000..40862e05 --- /dev/null +++ b/common/src/main/java/com/sdm/common/common/WsSceneEnum.java @@ -0,0 +1,22 @@ +package com.sdm.common.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum WsSceneEnum { + + BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" ); + + /** + * scene webscoket 推送的业务场景 + */ + private final String scene; + + /** + * scene webscoket 推送的业务场景名称 + */ + private final String sceneName ; + +} diff --git a/common/src/main/java/com/sdm/common/feign/impl/system/WsPushToolFeignClientImpl.java b/common/src/main/java/com/sdm/common/feign/impl/system/WsPushToolFeignClientImpl.java new file mode 100644 index 00000000..bab04a5e --- /dev/null +++ b/common/src/main/java/com/sdm/common/feign/impl/system/WsPushToolFeignClientImpl.java @@ -0,0 +1,47 @@ +package com.sdm.common.feign.impl.system; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.WsMessage; +import com.sdm.common.feign.inter.system.IWsPushToolFeignClient; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WsPushToolFeignClientImpl implements IWsPushToolFeignClient { + + @Autowired + private IWsPushToolFeignClient wsPushToolFeignClient; + + @Override + public SdmResponse wsPushOne(WsMessage wsMessage) { + SdmResponse sdmResponse; + try { + sdmResponse = wsPushToolFeignClient.wsPushOne(wsMessage); + if (!sdmResponse.isSuccess() ){ + return SdmResponse.failed("推送单一用户失败"); + } + } catch (Exception e) { + log.error("推送单一用户失败", e); + return SdmResponse.failed("推送单一用户失败"); + } + return sdmResponse; + } + + @Override + public SdmResponse wsPushAll(WsMessage wsMessage) { + SdmResponse sdmResponse; + try { + sdmResponse = wsPushToolFeignClient.wsPushAll(wsMessage); + if (!sdmResponse.isSuccess() ){ + return SdmResponse.failed("推送全量用户失败"); + } + } catch (Exception e) { + log.error("推送全量用户失败", e); + return SdmResponse.failed("推送全量用户失败"); + } + return sdmResponse; + } + +} diff --git a/common/src/main/java/com/sdm/common/feign/inter/system/IWsPushToolFeignClient.java b/common/src/main/java/com/sdm/common/feign/inter/system/IWsPushToolFeignClient.java new file mode 100644 index 00000000..373b2a18 --- /dev/null +++ b/common/src/main/java/com/sdm/common/feign/inter/system/IWsPushToolFeignClient.java @@ -0,0 +1,18 @@ +package com.sdm.common.feign.inter.system; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.WsMessage; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +@FeignClient(name = "system",contextId = "IWsPushToolFeignClient") +public interface IWsPushToolFeignClient { + + @PostMapping("/inWs/wsPushOne") + SdmResponse wsPushOne(@RequestBody WsMessage wsMessage); + + @PostMapping("/inWs/wsPushAll") + SdmResponse wsPushAll( @RequestBody WsMessage wsMessage); + +} diff --git a/data/src/main/java/com/sdm/data/controller/MockWebScoketController.java b/data/src/main/java/com/sdm/data/controller/MockWebScoketController.java new file mode 100644 index 00000000..2b390238 --- /dev/null +++ b/data/src/main/java/com/sdm/data/controller/MockWebScoketController.java @@ -0,0 +1,52 @@ +package com.sdm.data.controller; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.common.WsMessage; +import com.sdm.common.common.WsSceneEnum; +import com.sdm.common.feign.inter.system.IWsPushToolFeignClient; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +@Tag(name = "测试websocket", description = "测试websocket") +@RestController +@RequestMapping("/test") +public class MockWebScoketController { + + @Autowired + private IWsPushToolFeignClient wsPushToolFeignClient; + + @Operation(summary = "模拟发送单一用户消息", description = "推送单一用户") + @PostMapping("/wsPushOne") + public SdmResponse wsPushOne(@RequestBody Map map) { + WsMessage> message = new WsMessage<>(); + message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene()); + message.setUserId(ThreadLocalContext.getUserId()); + message.setTimestamp(System.currentTimeMillis()); + message.setData(map); + SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message); + return sdmResponse; + } + + + + @Operation(summary = "模拟发送所有用户消息", description = "推送所有用户") + @PostMapping("/wsPushAll") + public SdmResponse wsPushAll(@RequestBody Map map) { + WsMessage> message = new WsMessage<>(); + message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene()); + message.setUserId(ThreadLocalContext.getUserId()); + message.setTimestamp(System.currentTimeMillis()); + message.setData(map); + SdmResponse sdmResponse = wsPushToolFeignClient.wsPushAll(message); + return sdmResponse; + } + +} \ No newline at end of file diff --git a/system/pom.xml b/system/pom.xml index 0e84680c..15b75dcd 100644 --- a/system/pom.xml +++ b/system/pom.xml @@ -95,6 +95,11 @@ + + org.springframework.boot + spring-boot-starter-websocket + + diff --git a/system/src/main/java/com/sdm/system/config/WebSocketConfig.java b/system/src/main/java/com/sdm/system/config/WebSocketConfig.java new file mode 100644 index 00000000..21b47850 --- /dev/null +++ b/system/src/main/java/com/sdm/system/config/WebSocketConfig.java @@ -0,0 +1,14 @@ +package com.sdm.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +@Configuration +public class WebSocketConfig { + + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} \ No newline at end of file diff --git a/system/src/main/java/com/sdm/system/controller/WebSocketController.java b/system/src/main/java/com/sdm/system/controller/WebSocketController.java new file mode 100644 index 00000000..dfdd73a1 --- /dev/null +++ b/system/src/main/java/com/sdm/system/controller/WebSocketController.java @@ -0,0 +1,79 @@ +package com.sdm.system.controller; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.WsMessage; +import com.sdm.common.feign.inter.system.IWsPushToolFeignClient; +import com.sdm.system.service.impl.WsPushTool; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +@Tag(name = "内部服务之间websocket消息推送至前端", description = "内部服务之间websocket消息推送至前端") +@RestController +@RequestMapping("/inWs") +public class WebSocketController implements IWsPushToolFeignClient { + + @Autowired + private WsPushTool wsPushTool; + + @Operation(summary = "推送单一用户", description = "推送单一用户") + @PostMapping("/wsPushOne") + public SdmResponse wsPushOne( @RequestBody WsMessage wsMessage) { + validateWsMessage(wsMessage); + wsPushTool.pushOne(wsMessage); + return SdmResponse.success(); + } + + @Operation(summary = "推送所有用户", description = "推送所有用户") + @PostMapping("/wsPushAll") + public SdmResponse wsPushAll( @RequestBody WsMessage wsMessage) { + validateWsMessage(wsMessage); + wsPushTool.pushAll(wsMessage); + return SdmResponse.success(); + } + + @Operation(summary = "获取所有在线用户", description = "获取所有在线用户") + @GetMapping("/getAllWsUsers") + public SdmResponse> getAllWsUsers() { + SdmResponse> resp = wsPushTool.getAllWsUsers(); + return resp; + } + + /** + * 校验 WsMessage 所有字段不能为空 + * 为空则抛出 IllegalArgumentException 异常 + */ + private void validateWsMessage(WsMessage message) { + // 1. 整个消息对象不能为 null + if (message == null) { + throw new IllegalArgumentException("WebSocket消息体不能为null"); + } + + // 2. scene 场景不能为空 + if (message.getScene() == null || message.getScene().isBlank()) { + throw new IllegalArgumentException("WebSocket消息scene不能为空"); + } + + // 3. userId 接收人ID不能为空 + if (message.getUserId() == null) { + throw new IllegalArgumentException("WebSocket消息userId不能为空"); + } + + // 4. 时间戳不能为空 + if (message.getTimestamp() == null) { + throw new IllegalArgumentException("WebSocket消息timestamp不能为空"); + } + + // 5. 消息数据 data 不能为空(业务数据必须有) + if (message.getData() == null) { + throw new IllegalArgumentException("WebSocket消息data不能为空"); + } + } + + + + +} \ No newline at end of file diff --git a/system/src/main/java/com/sdm/system/controller/WebSocketServer.java b/system/src/main/java/com/sdm/system/controller/WebSocketServer.java new file mode 100644 index 00000000..c4dbf51c --- /dev/null +++ b/system/src/main/java/com/sdm/system/controller/WebSocketServer.java @@ -0,0 +1,122 @@ +package com.sdm.system.controller; + +import com.sdm.common.common.ThreadLocalContext; +import jakarta.websocket.*; +import jakarta.websocket.server.ServerEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Component +@ServerEndpoint("/sysWs") +public class WebSocketServer { + + // 在线会话池(安全) + public static final Map SESSION_POOL = new ConcurrentHashMap<>(); + + // ==================== 连接建立 ==================== + @OnOpen + public void onOpen(Session session) { + try { + // 获取 userId + Long userId = ThreadLocalContext.getUserId(); + if(Objects.isNull(userId)){ + throw new RuntimeException("userId不能是null"); + } + // ========================================== + // userId 存入 Session + // ========================================== + session.getUserProperties().put("userid", userId); + SESSION_POOL.put(userId, session); + log.info("用户[{}]连接成功,当前在线:{}", userId, SESSION_POOL.size()); + } catch (Exception e) { + log.error("WebSocket 连接失败", e); + } + } + + // ==================== 关闭连接 ==================== + @OnClose + public void onClose(Session session) { + // 从 Session 取 userId,绝对安全 + Long userId = (Long) session.getUserProperties().get("userid"); + if (userId != null) { + SESSION_POOL.remove(userId); + log.info("用户[{}]断开连接,当前在线:{}", userId, SESSION_POOL.size()); + } + try { + if (session.isOpen()) session.close(); + } catch (Exception ignored) { + log.info("用户[{}]断开连接异常-->", userId, ignored); + } + } + + // ==================== 接收消息 ==================== + @OnMessage + public void onMessage(String message, Session session) { + Long userId = (Long) session.getUserProperties().get("userid"); + log.info("收到用户[{}]消息:{}", userId, message); + } + + // ==================== 异常 ==================== + @OnError + public void onError(Session session, Throwable error) { + Long userId = (Long) session.getUserProperties().get("userid"); + log.error("用户[{}]异常:", userId, error); + } + + /** + * 推送消息给单一用户 + */ + public static void sendToUser(Long userId, String message) { + Session session = SESSION_POOL.get(userId); + if (session != null && session.isOpen()) { + try { + session.getBasicRemote().sendText(message); + } catch (Exception e) { + log.error("用户[{}]消息发送失败-->", userId,e); + throw new RuntimeException("消息发送异常"+e.getMessage()); + } + }else { + log.warn("用户[{}]消息发送失败,session null or close", userId); + throw new RuntimeException("消息发送失败,session关闭"); + } + } + + /** + * 广播消息 -> 给所有在线用户发送 + */ + public static void sendToAllUser(String message) { + // 遍历所有在线用户 + for (Map.Entry entry : SESSION_POOL.entrySet()) { + Long userId = entry.getKey(); + Session session = entry.getValue(); + try { + if (session != null && session.isOpen()) { + session.getBasicRemote().sendText(message); + log.info("广播消息发送给用户[{}]成功", userId); + } + } catch (Exception e) { + // 发送失败就移除无效连接 + log.error("广播消息发送给用户[{}]失败-->", userId, e); + } + } + } + + public static List getAllWsUsers() { + List userIds = new ArrayList<>(); + for (Map.Entry entry : SESSION_POOL.entrySet()) { + Long userId = entry.getKey(); + if(!Objects.isNull(userId)){ + userIds.add(userId); + } + } + return userIds; + } + +} \ No newline at end of file diff --git a/system/src/main/java/com/sdm/system/service/impl/WsPushTool.java b/system/src/main/java/com/sdm/system/service/impl/WsPushTool.java new file mode 100644 index 00000000..8608f162 --- /dev/null +++ b/system/src/main/java/com/sdm/system/service/impl/WsPushTool.java @@ -0,0 +1,65 @@ +package com.sdm.system.service.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.WsMessage; +import com.sdm.system.controller.WebSocketServer; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class WsPushTool { + + @Autowired + private ObjectMapper objectMapper; + + /** + * 按用户ID + 场景推送到前端 + */ + public void pushOne(WsMessage message) { + String json=""; + try { + // 1. 用Jackson序列化泛型对象,支持泛型 + json = objectMapper.writeValueAsString(message); + } catch (Exception e) { + log.error("WebSocket消息序列化失败,userId:{},scene:{}.-->", message.getUserId(),message.getScene(), e); + throw new RuntimeException("WebSocket消息序列化失败"); + } + if(StringUtils.isBlank(json)){ + log.warn("WebSocket消息序推送失败,userId:{},message是空", message.getUserId()); + throw new RuntimeException("WebSocket消息序推送失败,message是空"); + } + WebSocketServer.sendToUser( message.getUserId(), json); + } + + /** + * 场景全量用户推送到前端 + */ + public void pushAll(WsMessage message) { + String json=""; + try { + // 1. 用Jackson序列化泛型对象,支持泛型 + json = objectMapper.writeValueAsString(message); + } catch (Exception e) { + log.error("推送所有用户,WebSocket消息序列化失败,userId:{},scene:{}.-->", message.getUserId(),message.getScene(), e); + return; + } + if(StringUtils.isBlank(json)){ + log.warn("推送所有用户,WebSocket消息序推送失败,userId:{},message是空", message.getUserId()); + return; + } + WebSocketServer.sendToAllUser(json); + } + + + public SdmResponse> getAllWsUsers() { + List allWsUsers = WebSocketServer.getAllWsUsers(); + return SdmResponse.success(allWsUsers); + } + +}