From 9187b85e3becaa8275db5114e43fe3a9ed64da47 Mon Sep 17 00:00:00 2001 From: gulongcheng <474084054@qq.com> Date: Fri, 6 Mar 2026 11:45:29 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E6=95=B0=E6=8D=AE=E9=A2=84=E6=B5=8B?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=94=A8=E6=88=B7=E7=A7=9F=E6=88=B7id?= =?UTF-8?q?=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/WebSocket 配置与原理说明.md | 968 ++++++++++++++++++ .../data/service/impl/ModelServiceImpl.java | 6 + .../main/resources/static/websocketTest.html | 4 +- 3 files changed, 976 insertions(+), 2 deletions(-) create mode 100644 data/WebSocket 配置与原理说明.md diff --git a/data/WebSocket 配置与原理说明.md b/data/WebSocket 配置与原理说明.md new file mode 100644 index 00000000..f8a338a1 --- /dev/null +++ b/data/WebSocket 配置与原理说明.md @@ -0,0 +1,968 @@ +# WebSocket 配置与原理说明文档 + +## 目录 +- [1. 概述](#1-概述) +- [2. 核心组件](#2-核心组件) +- [3. 配置详解](#3-配置详解) +- [4. 工作原理](#4-工作原理) +- [5. 交互流程](#5-交互流程) +- [6. 使用示例](#6-使用示例) +- [7. 常见问题排查](#7-常见问题排查) + +--- + +## 1. 概述 + +### 1.1 功能定位 +Data 模块的 WebSocket 功能主要用于**训练模型实时通知**,支持以下场景: +- ✅ 模型数据处理完成通知 +- ✅ 模型训练进度通知 +- ✅ 模型预测结果通知 + +### 1.2 技术栈 +- **框架**: Spring WebSocket +- **协议**: WebSocket (RFC 6455) +- **消息格式**: JSON (FastJSON2) +- **并发处理**: ConcurrentHashMap 线程安全 + +### 1.3 架构位置 +``` +前端页面 → Nginx(3001) → Gateway(7100) → Data Service(7104) → WebSocket 推送 +``` + +--- + +## 2. 核心组件 + +### 2.1 组件关系图 +```mermaid +graph TB + A[WebSocketConfig] -->|注册 | B[WebSocketService] + C[WebSocketHandshakeInterceptor] -->|拦截 | B + B -->|管理 | D[WebSocketSession] +``` + +### 2.2 组件职责 + +#### 2.2.1 WebSocketConfig +**文件路径**: `data/src/main/java/com/sdm/data/config/WebSocketConfig.java` + +**职责**: +- 注册 WebSocket 处理器 +- 配置握手拦截器 +- 设置跨域访问规则 + +**核心代码**: +```java +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(webSocketService, "/ws/data/modelTraining") + .addInterceptors(webSocketHandshakeInterceptor) + .setAllowedOrigins("*"); + } +} +``` + +#### 2.2.2 WebSocketHandshakeInterceptor +**文件路径**: `data/src/main/java/com/sdm/data/config/WebSocketHandshakeInterceptor.java` + +**职责**: +- 在握手前提取请求参数(userId, modelId) +- 将参数存储到会话属性(attributes)中 +- 验证参数有效性 + +**关键方法**: +```java +@Override +public boolean beforeHandshake(ServerHttpRequest request, + ServerHttpResponse response, + WebSocketHandler wsHandler, + Map attributes) { + // 1. 从请求参数中获取 userId 和 modelId + String userIdStr = servletRequest.getServletRequest().getParameter("userId"); + String modelIdStr = servletRequest.getServletRequest().getParameter("modelId"); + + // 2. 类型转换并存储到 attributes + attributes.put("userId", Long.valueOf(userIdStr)); + attributes.put("modelId", Long.valueOf(modelIdStr)); + + return true; // 允许握手 +} +``` + +#### 2.2.3 WebSocketService +**文件路径**: `data/src/main/java/com/sdm/data/service/WebSocketService.java` + +**职责**: +- 管理所有活跃的 WebSocket 会话 +- 建立用户模型与会话的映射关系 +- 提供消息发送方法(广播、单播、业务通知) + +**核心数据结构**: +```java +// 存储所有活跃的 WebSocket 会话 +private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + +// 存储用户模型 ID 与会话 ID 的映射关系 (userId_modelId -> sessionId) +private final ConcurrentHashMap userModelSessionMap = new ConcurrentHashMap<>(); +``` + +**主要方法**: +| 方法名 | 功能 | 参数 | +|--------|------|------| +| `sendToAll()` | 广播消息给所有客户端 | message | +| `sendToSession()` | 向指定会话发送消息 | sessionId, message | +| `sendToUserModel()` | 向特定用户的特定模型发送消息 | userId, modelId, message | +| `sendDataProcessingNotification()` | 发送数据处理完成通知 | userId, modelId, success, message | +| `sendTrainingProcessingNotification()` | 发送训练处理通知 | userId, modelId, success, message | +| `sendPredictionResultNotification()` | 发送预测结果通知 | userId, modelId, success, message | + +--- + +## 3. 配置详解 + +### 3.1 后端配置 + +#### 3.1.1 Data 服务配置 +**文件**: `data/src/main/resources/application-local.yml` + +```yaml +server: + port: 7104 # Data 服务端口 + +spring: + application: + name: data # 服务名(用于 Nacos 注册) +``` + +#### 3.1.2 网关配置 +**文件**: `gateway2/src/main/resources/application-local.yml` + +```yaml +server: + port: 7100 # 网关端口 + +spring: + cloud: + gateway: + routes: + - id: data-service + uri: lb://data # 负载均衡到 data 服务 + predicates: + - Path=/simulation/data/** # 路径匹配 + filters: + - StripPrefix=2 # 去除 2 级前缀 +``` + +**路径转换说明**: +``` +前端请求:/simulation/data/ws/data/modelTraining +网关处理:去除 /simulation/data (2 级) +转发到 Data: /ws/data/modelTraining +``` + +### 3.2 Nginx 配置 + +**文件**: `/usr/local/nginx/conf/nginx.conf` 或独立配置文件 + +```nginx +server { + listen 3001; # Nginx 监听端口 + server_name localhost; + + # WebSocket 代理配置 + location /wsApi/ { + # 路径重写:去掉 /wsApi/ 前缀 + rewrite ^/wsApi/(.*)$ /$1 break; + + # 转发到网关 + proxy_pass http://192.168.65.161:7100; + + # ========== WebSocket 必需配置 ========== + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + + # 超时时间设置(长连接) + proxy_read_timeout 86400; + proxy_send_timeout 86400; + } +} +``` + +**配置说明**: +| 配置项 | 作用 | 必要性 | +|--------|------|--------| +| `proxy_http_version 1.1` | 使用 HTTP/1.1 协议 | ⭐⭐⭐ 必需 | +| `proxy_set_header Upgrade $http_upgrade` | 传递协议升级头 | ⭐⭐⭐ 必需 | +| `proxy_set_header Connection "upgrade"` | 传递连接升级指令 | ⭐⭐⭐ 必需 | +| `rewrite ^/wsApi/(.*)$ /$1 break` | 路径重写 | ⭐⭐⭐ 必需 | + +### 3.3 前端配置 + +#### 3.3.1 连接 URL 格式 +```javascript +const wsUrl = `ws://nginx-ip:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${userId}&modelId=${modelId}`; +``` + +**URL 组成解析**: +``` +ws:// # WebSocket 协议 +192.168.65.57:3001 # Nginx 地址和端口 +/wsApi/ # Nginx WebSocket 代理前缀 +/simulation/data/ # 网关路由前缀 +/ws/data/modelTraining # Data 服务注册的 WebSocket 路径 +?userId=xxx&modelId=xxx # 业务参数 +``` + +#### 3.3.2 连接示例代码 +```javascript +class ModelWebSocket { + constructor(userId, modelId) { + this.userId = userId; + this.modelId = modelId; + this.ws = null; + this.reconnectTimer = null; + this.heartbeatTimer = null; + this.reconnectCount = 0; + this.maxReconnectCount = 5; + this.reconnectInterval = 3000; + this.heartbeatInterval = 30000; + } + + // 建立连接 + connect() { + const wsUrl = `ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${this.userId}&modelId=${this.modelId}`; + + try { + this.ws = new WebSocket(wsUrl); + this.ws.onopen = this.onOpen.bind(this); + this.ws.onmessage = this.onMessage.bind(this); + this.ws.onclose = this.onClose.bind(this); + this.ws.onerror = this.onError.bind(this); + } catch (error) { + console.error('WebSocket 连接异常:', error); + this.reconnect(); + } + } + + // 连接成功 + onOpen(event) { + console.log('WebSocket 连接成功', event); + this.reconnectCount = 0; + this.startHeartbeat(); + } + + // 接收消息 + onMessage(event) { + const data = JSON.parse(event.data); + console.log('收到消息:', data); + + // 根据消息类型处理 + switch (data.type) { + case 'connection': + console.log('连接确认,会话 ID:', data.sessionId); + break; + case 'modelDataProcessing': + this.handleDataProcessing(data); + break; + case 'modelTrainingProcessing': + this.handleTrainingProcessing(data); + break; + case 'modelPredictionResult': + this.handlePredictionResult(data); + break; + default: + console.warn('未知消息类型:', data.type); + } + } + + // 连接关闭 + onClose(event) { + console.log('WebSocket 连接关闭', event); + this.stopHeartbeat(); + + // 非正常关闭时重连 + if (event.code !== 1000) { + this.reconnect(); + } + } + + // 连接错误 + onError(error) { + console.error('WebSocket 错误:', error); + } + + // 重连机制 + reconnect() { + if (this.reconnectCount >= this.maxReconnectCount) { + console.error('重连次数已达上限'); + return; + } + + this.reconnectCount++; + console.log(`第 ${this.reconnectCount} 次重连...`); + + this.reconnectTimer = setTimeout(() => { + this.connect(); + }, this.reconnectInterval); + } + + // 心跳机制 + startHeartbeat() { + this.heartbeatTimer = setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() })); + } + }, this.heartbeatInterval); + } + + stopHeartbeat() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } + + // 发送消息 + send(message) { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(message)); + } else { + console.warn('WebSocket 未连接,无法发送消息'); + } + } + + // 关闭连接 + close() { + this.reconnectTimer && clearTimeout(this.reconnectTimer); + this.stopHeartbeat(); + + if (this.ws) { + this.ws.close(1000, '正常关闭'); + this.ws = null; + } + } +} + +// 使用示例 +const wsClient = new ModelWebSocket(userId, modelId); +wsClient.connect(); +``` + +--- + +## 4. 工作原理 + +### 4.1 连接建立流程 + +```mermaid +sequenceDiagram + participant 前端 + participant Nginx + participant Gateway + participant Data + participant Interceptor + participant Service + + 前端->>Nginx: ws://nginx:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123 + Nginx->>Nginx: rewrite 去掉 /wsApi/ + Nginx->>Gateway: http://gateway:7100/simulation/data/ws/data/modelTraining?userId=1&modelId=123 + Gateway->>Gateway: 匹配路由 /simulation/data/** + Gateway->>Gateway: StripPrefix=2 去除前缀 + Gateway->>Data: http://data:7104/ws/data/modelTraining?userId=1&modelId=123 + Data->>Interceptor: beforeHandshake() + Interceptor->>Interceptor: 提取参数到 attributes + Interceptor->>Data: 返回 true 允许握手 + Data->>Service: afterConnectionEstablished() + Service->>Service: 存储 session 到 sessions map + Service->>Service: 从 attributes 读取 userId, modelId + Service->>Service: 建立 userModelSessionMap 映射 + Service->>前端:发送连接成功消息 +``` + +### 4.2 参数传递机制 + +#### 4.2.1 参数流转 +``` +URL 参数 → Interceptor → attributes → WebSocketSession → Service +``` + +**详细说明**: + +1. **URL 参数阶段** + ``` + /ws/data/modelTraining?userId=1&modelId=123 + ``` + +2. **拦截器提取阶段** + ```java + // WebSocketHandshakeInterceptor.java + String userIdStr = servletRequest.getServletRequest().getParameter("userId"); + attributes.put("userId", Long.valueOf(userIdStr)); + ``` + +3. **会话属性存储阶段** + ```java + // WebSocketSession.attributes 中存储 + session.getAttributes().get("userId") + ``` + +4. **服务使用阶段** + ```java + // WebSocketService.java + Object userIdObj = session.getAttributes().get("userId"); + Long userId = (Long) userIdObj; + ``` + +#### 4.2.2 为什么使用 attributes? +- ✅ **符合规范**: 遵循 WebSocket 参数传递规范 +- ✅ **类型安全**: 在拦截器中完成类型转换 +- ✅ **解耦设计**: Service 层不需要解析 URL 参数 +- ✅ **统一管理**: 所有会话参数集中管理 + +### 4.3 会话映射机制 + +#### 4.3.1 双 Map 结构 +```java +// Map 1: sessionId -> WebSocketSession +ConcurrentHashMap sessions + +// Map 2: userId_modelId -> sessionId +ConcurrentHashMap userModelSessionMap +``` + +#### 4.3.2 映射关系建立 +```java +// 连接建立时 +String key = userId + "_" + modelId; // 复合键 +userModelSessionMap.put(key, session.getId()); +sessions.put(session.getId(), session); +``` + +#### 4.3.3 会话定位流程 +```mermaid +graph LR + A[需要发送消息] --> B{目标是谁?} + B -->|特定用户模型 | C[构建复合键 userId_modelId] + C --> D[查询 userModelSessionMap] + D --> E[获取 sessionId] + E --> F[查询 sessions] + F --> G[获取 WebSocketSession] + G --> H[发送消息] +``` + +### 4.4 消息推送机制 + +#### 4.4.1 业务通知封装 +```java +// 发送数据处理完成通知 +public void sendDataProcessingNotification(Long userId, Long modelId, + boolean success, String message) { + JSONObject notification = new JSONObject(); + notification.put("type", "modelDataProcessing"); + notification.put("modelId", modelId); + notification.put("success", success); + notification.put("message", message); + + sendToUserModel(userId, modelId, notification.toJSONString()); +} +``` + +#### 4.4.2 消息格式规范 +```json +{ + "type": "modelDataProcessing", + "modelId": 123, + "success": true, + "message": "数据处理完成" +} +``` + +**消息类型枚举**: +| type | 说明 | 触发时机 | +|------|------|----------| +| `connection` | 连接确认 | 连接建立成功 | +| `modelDataProcessing` | 数据处理通知 | 模型数据处理完成 | +| `modelTrainingProcessing` | 训练处理通知 | 训练任务完成 | +| `modelPredictionResult` | 预测结果通知 | 预测任务完成 | + +--- + +## 5. 交互流程 + +### 5.1 完整生命周期 + +```mermaid +sequenceDiagram + autonumber + participant 前端 + participant Nginx + participant Gateway + participant Data + participant 业务代码 + + 前端->>Nginx: 1. WebSocket 连接请求 + Nginx->>Gateway: 2. 转发(路径重写后) + Gateway->>Data: 3. 路由转发(去除前缀) + Data->>Data: 4. 拦截器提取参数 + Data->>Data: 5. 建立 WebSocket 连接 + Data->>前端:6. 返回连接成功消息 + + Note over 前端,Data: 连接已建立,等待业务事件 + + 业务代码->>Data: 7. 调用 sendXxxNotification() + Data->>Data: 8. 查找目标会话 + Data->>前端:9. 推送业务通知消息 + + Note over 前端,Data: 业务处理中... + + 前端->>Data: 10. 发送心跳/业务消息 + Data->>Data: 11. handleTextMessage() + + Note over 前端,Data: 页面关闭或网络断开 + + 前端->>Data: 12. 关闭连接 + Data->>Data: 13. afterConnectionClosed() + Data->>Data: 14. 清理会话映射 +``` + +### 5.2 典型业务场景 + +#### 场景 1: 模型数据处理完成 +```java +// 业务代码调用 +@Autowired +private WebSocketService webSocketService; + +public void processDataModel(Long userId, Long modelId) { + try { + // 1. 执行业务逻辑 + // ... 数据处理代码 ... + + // 2. 处理完成后推送通知 + webSocketService.sendDataProcessingNotification( + userId, + modelId, + true, + "数据处理成功" + ); + } catch (Exception e) { + // 3. 处理失败推送错误通知 + webSocketService.sendDataProcessingNotification( + userId, + modelId, + false, + "数据处理失败:" + e.getMessage() + ); + } +} +``` + +#### 场景 2: 模型训练任务完成 +```java +public void onTrainingComplete(Long userId, Long modelId, TrainingResult result) { + boolean success = result.isSuccess(); + String message = success ? "训练完成" : "训练失败"; + + webSocketService.sendTrainingProcessingNotification( + userId, + modelId, + success, + message + ); +} +``` + +#### 场景 3: 预测结果返回 +```java +public void onPredictionComplete(Long userId, Long modelId, PredictionResult result) { + JSONObject notification = new JSONObject(); + notification.put("type", "modelPredictionResult"); + notification.put("modelId", modelId); + notification.put("success", result.isSuccess()); + notification.put("data", result.getData()); + notification.put("message", result.getMessage()); + + webSocketService.sendToUserModel(userId, modelId, notification.toJSONString()); +} +``` + +--- + +## 6. 使用示例 + +### 6.1 后端调用示例 + +#### 示例 1: 在 Service 中推送通知 +```java +@Service +public class ModelTrainingService { + + @Autowired + private WebSocketService webSocketService; + + @Async + public void trainModel(Long userId, Long modelId, TrainingParams params) { + try { + log.info("开始训练模型,userId={}, modelId={}", userId, modelId); + + // 执行训练逻辑 + TrainingResult result = executeTraining(params); + + // 推送成功通知 + webSocketService.sendTrainingProcessingNotification( + userId, + modelId, + true, + "模型训练完成,准确率:" + result.getAccuracy() + ); + + } catch (Exception e) { + log.error("模型训练失败", e); + + // 推送失败通知 + webSocketService.sendTrainingProcessingNotification( + userId, + modelId, + false, + "训练失败:" + e.getMessage() + ); + } + } +} +``` + +#### 示例 2: 在 Controller 中触发 +```java +@RestController +@RequestMapping("/data/model") +public class ModelController { + + @Autowired + private WebSocketService webSocketService; + + @PostMapping("/predict") + public R predict(@RequestBody PredictionRequest request) { + Long userId = request.getUserId(); + Long modelId = request.getModelId(); + + try { + // 执行预测 + PredictionResult result = predictionService.predict(request); + + // 推送预测结果 + webSocketService.sendPredictionResultNotification( + userId, + modelId, + true, + "预测完成" + ); + + return R.ok(result); + } catch (Exception e) { + // 推送错误通知 + webSocketService.sendPredictionResultNotification( + userId, + modelId, + false, + "预测失败:" + e.getMessage() + ); + + return R.fail(e.getMessage()); + } + } +} +``` + +### 6.2 前端使用示例 + +#### Vue 3 示例 +```vue + + + +``` + +#### React 示例 +```jsx +import { useEffect, useState } from 'react'; + +function ModelNotification({ userId, modelId }) { + const [isConnected, setIsConnected] = useState(false); + const [notifications, setNotifications] = useState([]); + + useEffect(() => { + const wsClient = new ModelWebSocket(userId, modelId, { + onConnect: () => setIsConnected(true), + onDisconnect: () => setIsConnected(false), + onMessage: (data) => { + setNotifications(prev => [{ + id: Date.now(), + ...data + }, ...prev]); + } + }); + + wsClient.connect(); + + return () => { + wsClient.close(); + }; + }, [userId, modelId]); + + return ( +
+ {isConnected ? ✅ 已连接 : ❌ 未连接} +
    + {notifications.map(msg => ( +
  • {msg.message}
  • + ))} +
+
+ ); +} +``` + +--- + +## 7. 常见问题排查 + +### 7.1 连接问题 + +#### 问题 1: 断点不触发 +**现象**: 在 `WebSocketHandshakeInterceptor` 打断点,但不断入 + +**排查步骤**: +1. ✅ 确认 Nginx 监听端口与前端连接端口一致(3001) +2. ✅ 确认 Nginx 配置已重载:`nginx -s reload` +3. ✅ 确认网关在 `192.168.65.161:7100` 运行 +4. ✅ 确认 Data 服务已注册到 Nacos +5. ✅ 检查前端连接 URL 是否正确 + +**验证命令**: +```bash +# 检查 Nginx 端口 +netstat -an | grep 3001 + +# 检查网关端口 +netstat -an | grep 7100 + +# 检查 Data 服务端口 +netstat -an | grep 7104 + +# 测试 WebSocket 连接 +wscat -c "ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123" +``` + +#### 问题 2: 连接后立即断开 +**可能原因**: +- ❌ Nginx 未配置 WebSocket 头部 +- ❌ 超时时间设置过短 +- ❌ 防火墙拦截 + +**解决方案**: +```nginx +location /wsApi/ { + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_read_timeout 86400; + proxy_send_timeout 86400; +} +``` + +### 7.2 消息推送问题 + +#### 问题 3: 消息发送失败 +**错误日志**: +``` +WARN 未找到用户 123 的模型 456 的 WebSocket 会话 +``` + +**原因**: +- 用户未连接或已断开连接 +- userId 或 modelId 不匹配 + +**解决方案**: +1. 前端检查连接 URL 中的参数是否正确 +2. 后端检查发送消息时使用的 userId/modelId 是否正确 +3. 在 `WebSocketService` 中添加日志查看映射关系 + +#### 问题 4: 消息发送异常 +**错误日志**: +``` +ERROR 发送 WebSocket 消息失败,会话 ID: xxx +``` + +**原因**: +- 会话已关闭但仍尝试发送 + +**解决方案**: +代码已处理,会自动检查会话状态: +```java +if (session.isOpen()) { + session.sendMessage(new TextMessage(message)); +} +``` + +### 7.3 跨域问题 + +#### 问题 5: 跨域错误 +**浏览器错误**: +``` +WebSocket connection to 'ws://...' failed: +Error during WebSocket handshake: +Unexpected response code: 200 +``` + +**解决方案**: +```java +// WebSocketConfig.java +registry.addHandler(webSocketService, "/ws/data/modelTraining") + .setAllowedOrigins("*"); +``` + +### 7.4 并发问题 + +#### 问题 6: 并发修改异常 +**错误**: `ConcurrentModificationException` + +**原因**: 多线程同时修改 Map + +**解决方案**: +已使用 `ConcurrentHashMap`,线程安全: +```java +private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); +``` + +### 7.5 内存泄漏问题 + +#### 问题 7: 会话未清理 +**现象**: 运行一段时间后 OOM + +**排查**: +1. 检查 `afterConnectionClosed` 是否清理映射 +2. 检查是否有重复注册 + +**当前实现**: +```java +@Override +public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + String sessionId = session.getId(); + sessions.remove(sessionId); + userModelSessionMap.values().removeIf(id -> id.equals(sessionId)); +} +``` + +--- + +## 附录 + +### A. 相关文件清单 +``` +data/ +├── src/main/java/com/sdm/data/ +│ ├── config/ +│ │ ├── WebSocketConfig.java # WebSocket 配置类 +│ │ └── WebSocketHandshakeInterceptor.java # 握手拦截器 +│ └── service/ +│ └── WebSocketService.java # WebSocket 服务 +└── src/main/resources/ + └── application-local.yml # Data 服务配置 +``` + +### B. 依赖配置 +```xml + + + org.springframework.boot + spring-boot-starter-websocket + + + + com.alibaba.fastjson2 + fastjson2 + +``` + +### C. 测试工具 + +#### wscat 安装 +```bash +npm install -g wscat +``` + +#### 测试命令 +```bash +# 测试连接 +wscat -c "ws://localhost:7104/ws/data/modelTraining?userId=1&modelId=123" + +# 发送消息 +> {"type": "ping"} +``` + +### D. 参考资料 +- [Spring WebSocket 官方文档](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket) +- [WebSocket RFC 6455](https://datatracker.ietf.org/doc/html/rfc6455) +- [Nginx WebSocket 代理配置](https://www.nginx.com/blog/websocket-nginx/) + +--- + +## 更新日志 + +| 日期 | 版本 | 说明 | 作者 | +|------|------|------|------| +| 2026-03-06 | v1.0 | 初始版本 | Gulongcheng | diff --git a/data/src/main/java/com/sdm/data/service/impl/ModelServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/ModelServiceImpl.java index 4cdfd0d4..dfa6d6c8 100644 --- a/data/src/main/java/com/sdm/data/service/impl/ModelServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/ModelServiceImpl.java @@ -302,8 +302,11 @@ public class ModelServiceImpl implements IModelService { * @param modelId 模型ID */ private void processDataAsync(String pythonScriptPath, String paramJsonPath, Long trainingModelId, Long userId, Long modelId) { + Long tenantId = ThreadLocalContext.getTenantId(); new Thread(() -> { try { + ThreadLocalContext.setTenantId(tenantId); + ThreadLocalContext.setUserId(userId); // 调用Python脚本处理数据 long startTime = System.currentTimeMillis(); log.info("开始执行Python脚本, 训练模型ID: {}", trainingModelId); @@ -757,7 +760,10 @@ public class ModelServiceImpl implements IModelService { * @param userId 用户ID */ private void trainModelAsync(String paramJsonPath, Long modelId, Long userId, String exportFormat) { + Long tenantId = ThreadLocalContext.getTenantId(); new Thread(() -> { + ThreadLocalContext.setTenantId(tenantId); + ThreadLocalContext.setUserId(userId); Process process = null; // 用于实时读取日志的线程 Thread logReaderThread = null; diff --git a/data/src/main/resources/static/websocketTest.html b/data/src/main/resources/static/websocketTest.html index 6c4bd63f..13e61860 100644 --- a/data/src/main/resources/static/websocketTest.html +++ b/data/src/main/resources/static/websocketTest.html @@ -8,9 +8,9 @@

WebSocket 测试页面

- + - +