新增:增加公共webscoket模块

This commit is contained in:
2026-04-01 17:56:10 +08:00
parent e84a453b19
commit b982eb80d2
10 changed files with 449 additions and 0 deletions

View File

@@ -0,0 +1,25 @@
package com.sdm.common.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
// 公共websocke消息实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WsMessage<T> {
// 场景:定义在 WsSceneEnum 维护,前端根据这个字段处理业务
private String scene;
// 单一推送接收者的用户id; 群发推送发送人的推送id
private Long userId;
// 推送的时间戳
private Long timestamp;
// 推送的数据
private T data;
}

View File

@@ -0,0 +1,22 @@
package com.sdm.common.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum WsSceneEnum {
BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" );
/**
* scene webscoket 推送的业务场景
*/
private final String scene;
/**
* scene webscoket 推送的业务场景名称
*/
private final String sceneName ;
}

View File

@@ -0,0 +1,47 @@
package com.sdm.common.feign.impl.system;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.WsMessage;
import com.sdm.common.feign.inter.system.IWsPushToolFeignClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WsPushToolFeignClientImpl implements IWsPushToolFeignClient {
@Autowired
private IWsPushToolFeignClient wsPushToolFeignClient;
@Override
public SdmResponse wsPushOne(WsMessage wsMessage) {
SdmResponse sdmResponse;
try {
sdmResponse = wsPushToolFeignClient.wsPushOne(wsMessage);
if (!sdmResponse.isSuccess() ){
return SdmResponse.failed("推送单一用户失败");
}
} catch (Exception e) {
log.error("推送单一用户失败", e);
return SdmResponse.failed("推送单一用户失败");
}
return sdmResponse;
}
@Override
public SdmResponse wsPushAll(WsMessage wsMessage) {
SdmResponse sdmResponse;
try {
sdmResponse = wsPushToolFeignClient.wsPushAll(wsMessage);
if (!sdmResponse.isSuccess() ){
return SdmResponse.failed("推送全量用户失败");
}
} catch (Exception e) {
log.error("推送全量用户失败", e);
return SdmResponse.failed("推送全量用户失败");
}
return sdmResponse;
}
}

View File

@@ -0,0 +1,18 @@
package com.sdm.common.feign.inter.system;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.WsMessage;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(name = "system",contextId = "IWsPushToolFeignClient")
public interface IWsPushToolFeignClient {
@PostMapping("/inWs/wsPushOne")
SdmResponse wsPushOne(@RequestBody WsMessage wsMessage);
@PostMapping("/inWs/wsPushAll")
SdmResponse wsPushAll( @RequestBody WsMessage wsMessage);
}

View File

@@ -0,0 +1,52 @@
package com.sdm.data.controller;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.ThreadLocalContext;
import com.sdm.common.common.WsMessage;
import com.sdm.common.common.WsSceneEnum;
import com.sdm.common.feign.inter.system.IWsPushToolFeignClient;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@Tag(name = "测试websocket", description = "测试websocket")
@RestController
@RequestMapping("/test")
public class MockWebScoketController {
@Autowired
private IWsPushToolFeignClient wsPushToolFeignClient;
@Operation(summary = "模拟发送单一用户消息", description = "推送单一用户")
@PostMapping("/wsPushOne")
public SdmResponse wsPushOne(@RequestBody Map<String, Object> map) {
WsMessage<Map<String, Object>> message = new WsMessage<>();
message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene());
message.setUserId(ThreadLocalContext.getUserId());
message.setTimestamp(System.currentTimeMillis());
message.setData(map);
SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message);
return sdmResponse;
}
@Operation(summary = "模拟发送所有用户消息", description = "推送所有用户")
@PostMapping("/wsPushAll")
public SdmResponse wsPushAll(@RequestBody Map<String, Object> map) {
WsMessage<Map<String, Object>> message = new WsMessage<>();
message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene());
message.setUserId(ThreadLocalContext.getUserId());
message.setTimestamp(System.currentTimeMillis());
message.setData(map);
SdmResponse sdmResponse = wsPushToolFeignClient.wsPushAll(message);
return sdmResponse;
}
}

View File

@@ -95,6 +95,11 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>

View File

@@ -0,0 +1,14 @@
package com.sdm.system.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

View File

@@ -0,0 +1,79 @@
package com.sdm.system.controller;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.WsMessage;
import com.sdm.common.feign.inter.system.IWsPushToolFeignClient;
import com.sdm.system.service.impl.WsPushTool;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@Tag(name = "内部服务之间websocket消息推送至前端", description = "内部服务之间websocket消息推送至前端")
@RestController
@RequestMapping("/inWs")
public class WebSocketController implements IWsPushToolFeignClient {
@Autowired
private WsPushTool wsPushTool;
@Operation(summary = "推送单一用户", description = "推送单一用户")
@PostMapping("/wsPushOne")
public SdmResponse wsPushOne( @RequestBody WsMessage wsMessage) {
validateWsMessage(wsMessage);
wsPushTool.pushOne(wsMessage);
return SdmResponse.success();
}
@Operation(summary = "推送所有用户", description = "推送所有用户")
@PostMapping("/wsPushAll")
public SdmResponse wsPushAll( @RequestBody WsMessage wsMessage) {
validateWsMessage(wsMessage);
wsPushTool.pushAll(wsMessage);
return SdmResponse.success();
}
@Operation(summary = "获取所有在线用户", description = "获取所有在线用户")
@GetMapping("/getAllWsUsers")
public SdmResponse<List<Long>> getAllWsUsers() {
SdmResponse<List<Long>> resp = wsPushTool.getAllWsUsers();
return resp;
}
/**
* 校验 WsMessage 所有字段不能为空
* 为空则抛出 IllegalArgumentException 异常
*/
private <T> void validateWsMessage(WsMessage<T> message) {
// 1. 整个消息对象不能为 null
if (message == null) {
throw new IllegalArgumentException("WebSocket消息体不能为null");
}
// 2. scene 场景不能为空
if (message.getScene() == null || message.getScene().isBlank()) {
throw new IllegalArgumentException("WebSocket消息scene不能为空");
}
// 3. userId 接收人ID不能为空
if (message.getUserId() == null) {
throw new IllegalArgumentException("WebSocket消息userId不能为空");
}
// 4. 时间戳不能为空
if (message.getTimestamp() == null) {
throw new IllegalArgumentException("WebSocket消息timestamp不能为空");
}
// 5. 消息数据 data 不能为空(业务数据必须有)
if (message.getData() == null) {
throw new IllegalArgumentException("WebSocket消息data不能为空");
}
}
}

View File

@@ -0,0 +1,122 @@
package com.sdm.system.controller;
import com.sdm.common.common.ThreadLocalContext;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint("/sysWs")
public class WebSocketServer {
// 在线会话池(安全)
public static final Map<Long, Session> SESSION_POOL = new ConcurrentHashMap<>();
// ==================== 连接建立 ====================
@OnOpen
public void onOpen(Session session) {
try {
// 获取 userId
Long userId = ThreadLocalContext.getUserId();
if(Objects.isNull(userId)){
throw new RuntimeException("userId不能是null");
}
// ==========================================
// userId 存入 Session
// ==========================================
session.getUserProperties().put("userid", userId);
SESSION_POOL.put(userId, session);
log.info("用户[{}]连接成功,当前在线:{}", userId, SESSION_POOL.size());
} catch (Exception e) {
log.error("WebSocket 连接失败", e);
}
}
// ==================== 关闭连接 ====================
@OnClose
public void onClose(Session session) {
// 从 Session 取 userId绝对安全
Long userId = (Long) session.getUserProperties().get("userid");
if (userId != null) {
SESSION_POOL.remove(userId);
log.info("用户[{}]断开连接,当前在线:{}", userId, SESSION_POOL.size());
}
try {
if (session.isOpen()) session.close();
} catch (Exception ignored) {
log.info("用户[{}]断开连接异常-->", userId, ignored);
}
}
// ==================== 接收消息 ====================
@OnMessage
public void onMessage(String message, Session session) {
Long userId = (Long) session.getUserProperties().get("userid");
log.info("收到用户[{}]消息:{}", userId, message);
}
// ==================== 异常 ====================
@OnError
public void onError(Session session, Throwable error) {
Long userId = (Long) session.getUserProperties().get("userid");
log.error("用户[{}]异常:", userId, error);
}
/**
* 推送消息给单一用户
*/
public static void sendToUser(Long userId, String message) {
Session session = SESSION_POOL.get(userId);
if (session != null && session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("用户[{}]消息发送失败-->", userId,e);
throw new RuntimeException("消息发送异常"+e.getMessage());
}
}else {
log.warn("用户[{}]消息发送失败,session null or close", userId);
throw new RuntimeException("消息发送失败,session关闭");
}
}
/**
* 广播消息 -> 给所有在线用户发送
*/
public static void sendToAllUser(String message) {
// 遍历所有在线用户
for (Map.Entry<Long, Session> entry : SESSION_POOL.entrySet()) {
Long userId = entry.getKey();
Session session = entry.getValue();
try {
if (session != null && session.isOpen()) {
session.getBasicRemote().sendText(message);
log.info("广播消息发送给用户[{}]成功", userId);
}
} catch (Exception e) {
// 发送失败就移除无效连接
log.error("广播消息发送给用户[{}]失败-->", userId, e);
}
}
}
public static List<Long> getAllWsUsers() {
List<Long> userIds = new ArrayList<>();
for (Map.Entry<Long, Session> entry : SESSION_POOL.entrySet()) {
Long userId = entry.getKey();
if(!Objects.isNull(userId)){
userIds.add(userId);
}
}
return userIds;
}
}

View File

@@ -0,0 +1,65 @@
package com.sdm.system.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.WsMessage;
import com.sdm.system.controller.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class WsPushTool {
@Autowired
private ObjectMapper objectMapper;
/**
* 按用户ID + 场景推送到前端
*/
public <T> void pushOne(WsMessage<T> message) {
String json="";
try {
// 1. 用Jackson序列化泛型对象支持泛型
json = objectMapper.writeValueAsString(message);
} catch (Exception e) {
log.error("WebSocket消息序列化失败userId:{}scene:{}.-->", message.getUserId(),message.getScene(), e);
throw new RuntimeException("WebSocket消息序列化失败");
}
if(StringUtils.isBlank(json)){
log.warn("WebSocket消息序推送失败userId:{}message是空", message.getUserId());
throw new RuntimeException("WebSocket消息序推送失败message是空");
}
WebSocketServer.sendToUser( message.getUserId(), json);
}
/**
* 场景全量用户推送到前端
*/
public <T> void pushAll(WsMessage<T> message) {
String json="";
try {
// 1. 用Jackson序列化泛型对象支持泛型
json = objectMapper.writeValueAsString(message);
} catch (Exception e) {
log.error("推送所有用户WebSocket消息序列化失败userId:{}scene:{}.-->", message.getUserId(),message.getScene(), e);
return;
}
if(StringUtils.isBlank(json)){
log.warn("推送所有用户WebSocket消息序推送失败userId:{}message是空", message.getUserId());
return;
}
WebSocketServer.sendToAllUser(json);
}
public SdmResponse<List<Long>> getAllWsUsers() {
List<Long> allWsUsers = WebSocketServer.getAllWsUsers();
return SdmResponse.success(allWsUsers);
}
}