fix:数据预测修复用户租户id丢失
This commit is contained in:
968
data/WebSocket 配置与原理说明.md
Normal file
968
data/WebSocket 配置与原理说明.md
Normal file
@@ -0,0 +1,968 @@
|
||||
# WebSocket 配置与原理说明文档
|
||||
|
||||
## 目录
|
||||
- [1. 概述](#1-概述)
|
||||
- [2. 核心组件](#2-核心组件)
|
||||
- [3. 配置详解](#3-配置详解)
|
||||
- [4. 工作原理](#4-工作原理)
|
||||
- [5. 交互流程](#5-交互流程)
|
||||
- [6. 使用示例](#6-使用示例)
|
||||
- [7. 常见问题排查](#7-常见问题排查)
|
||||
|
||||
---
|
||||
|
||||
## 1. 概述
|
||||
|
||||
### 1.1 功能定位
|
||||
Data 模块的 WebSocket 功能主要用于**训练模型实时通知**,支持以下场景:
|
||||
- ✅ 模型数据处理完成通知
|
||||
- ✅ 模型训练进度通知
|
||||
- ✅ 模型预测结果通知
|
||||
|
||||
### 1.2 技术栈
|
||||
- **框架**: Spring WebSocket
|
||||
- **协议**: WebSocket (RFC 6455)
|
||||
- **消息格式**: JSON (FastJSON2)
|
||||
- **并发处理**: ConcurrentHashMap 线程安全
|
||||
|
||||
### 1.3 架构位置
|
||||
```
|
||||
前端页面 → Nginx(3001) → Gateway(7100) → Data Service(7104) → WebSocket 推送
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 2. 核心组件
|
||||
|
||||
### 2.1 组件关系图
|
||||
```mermaid
|
||||
graph TB
|
||||
A[WebSocketConfig] -->|注册 | B[WebSocketService]
|
||||
C[WebSocketHandshakeInterceptor] -->|拦截 | B
|
||||
B -->|管理 | D[WebSocketSession]
|
||||
```
|
||||
|
||||
### 2.2 组件职责
|
||||
|
||||
#### 2.2.1 WebSocketConfig
|
||||
**文件路径**: `data/src/main/java/com/sdm/data/config/WebSocketConfig.java`
|
||||
|
||||
**职责**:
|
||||
- 注册 WebSocket 处理器
|
||||
- 配置握手拦截器
|
||||
- 设置跨域访问规则
|
||||
|
||||
**核心代码**:
|
||||
```java
|
||||
@Configuration
|
||||
@EnableWebSocket
|
||||
public class WebSocketConfig implements WebSocketConfigurer {
|
||||
@Override
|
||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
||||
registry.addHandler(webSocketService, "/ws/data/modelTraining")
|
||||
.addInterceptors(webSocketHandshakeInterceptor)
|
||||
.setAllowedOrigins("*");
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 2.2.2 WebSocketHandshakeInterceptor
|
||||
**文件路径**: `data/src/main/java/com/sdm/data/config/WebSocketHandshakeInterceptor.java`
|
||||
|
||||
**职责**:
|
||||
- 在握手前提取请求参数(userId, modelId)
|
||||
- 将参数存储到会话属性(attributes)中
|
||||
- 验证参数有效性
|
||||
|
||||
**关键方法**:
|
||||
```java
|
||||
@Override
|
||||
public boolean beforeHandshake(ServerHttpRequest request,
|
||||
ServerHttpResponse response,
|
||||
WebSocketHandler wsHandler,
|
||||
Map<String, Object> attributes) {
|
||||
// 1. 从请求参数中获取 userId 和 modelId
|
||||
String userIdStr = servletRequest.getServletRequest().getParameter("userId");
|
||||
String modelIdStr = servletRequest.getServletRequest().getParameter("modelId");
|
||||
|
||||
// 2. 类型转换并存储到 attributes
|
||||
attributes.put("userId", Long.valueOf(userIdStr));
|
||||
attributes.put("modelId", Long.valueOf(modelIdStr));
|
||||
|
||||
return true; // 允许握手
|
||||
}
|
||||
```
|
||||
|
||||
#### 2.2.3 WebSocketService
|
||||
**文件路径**: `data/src/main/java/com/sdm/data/service/WebSocketService.java`
|
||||
|
||||
**职责**:
|
||||
- 管理所有活跃的 WebSocket 会话
|
||||
- 建立用户模型与会话的映射关系
|
||||
- 提供消息发送方法(广播、单播、业务通知)
|
||||
|
||||
**核心数据结构**:
|
||||
```java
|
||||
// 存储所有活跃的 WebSocket 会话
|
||||
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
|
||||
|
||||
// 存储用户模型 ID 与会话 ID 的映射关系 (userId_modelId -> sessionId)
|
||||
private final ConcurrentHashMap<String, String> userModelSessionMap = new ConcurrentHashMap<>();
|
||||
```
|
||||
|
||||
**主要方法**:
|
||||
| 方法名 | 功能 | 参数 |
|
||||
|--------|------|------|
|
||||
| `sendToAll()` | 广播消息给所有客户端 | message |
|
||||
| `sendToSession()` | 向指定会话发送消息 | sessionId, message |
|
||||
| `sendToUserModel()` | 向特定用户的特定模型发送消息 | userId, modelId, message |
|
||||
| `sendDataProcessingNotification()` | 发送数据处理完成通知 | userId, modelId, success, message |
|
||||
| `sendTrainingProcessingNotification()` | 发送训练处理通知 | userId, modelId, success, message |
|
||||
| `sendPredictionResultNotification()` | 发送预测结果通知 | userId, modelId, success, message |
|
||||
|
||||
---
|
||||
|
||||
## 3. 配置详解
|
||||
|
||||
### 3.1 后端配置
|
||||
|
||||
#### 3.1.1 Data 服务配置
|
||||
**文件**: `data/src/main/resources/application-local.yml`
|
||||
|
||||
```yaml
|
||||
server:
|
||||
port: 7104 # Data 服务端口
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: data # 服务名(用于 Nacos 注册)
|
||||
```
|
||||
|
||||
#### 3.1.2 网关配置
|
||||
**文件**: `gateway2/src/main/resources/application-local.yml`
|
||||
|
||||
```yaml
|
||||
server:
|
||||
port: 7100 # 网关端口
|
||||
|
||||
spring:
|
||||
cloud:
|
||||
gateway:
|
||||
routes:
|
||||
- id: data-service
|
||||
uri: lb://data # 负载均衡到 data 服务
|
||||
predicates:
|
||||
- Path=/simulation/data/** # 路径匹配
|
||||
filters:
|
||||
- StripPrefix=2 # 去除 2 级前缀
|
||||
```
|
||||
|
||||
**路径转换说明**:
|
||||
```
|
||||
前端请求:/simulation/data/ws/data/modelTraining
|
||||
网关处理:去除 /simulation/data (2 级)
|
||||
转发到 Data: /ws/data/modelTraining
|
||||
```
|
||||
|
||||
### 3.2 Nginx 配置
|
||||
|
||||
**文件**: `/usr/local/nginx/conf/nginx.conf` 或独立配置文件
|
||||
|
||||
```nginx
|
||||
server {
|
||||
listen 3001; # Nginx 监听端口
|
||||
server_name localhost;
|
||||
|
||||
# WebSocket 代理配置
|
||||
location /wsApi/ {
|
||||
# 路径重写:去掉 /wsApi/ 前缀
|
||||
rewrite ^/wsApi/(.*)$ /$1 break;
|
||||
|
||||
# 转发到网关
|
||||
proxy_pass http://192.168.65.161:7100;
|
||||
|
||||
# ========== WebSocket 必需配置 ==========
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Upgrade $http_upgrade;
|
||||
proxy_set_header Connection "upgrade";
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
|
||||
# 超时时间设置(长连接)
|
||||
proxy_read_timeout 86400;
|
||||
proxy_send_timeout 86400;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**配置说明**:
|
||||
| 配置项 | 作用 | 必要性 |
|
||||
|--------|------|--------|
|
||||
| `proxy_http_version 1.1` | 使用 HTTP/1.1 协议 | ⭐⭐⭐ 必需 |
|
||||
| `proxy_set_header Upgrade $http_upgrade` | 传递协议升级头 | ⭐⭐⭐ 必需 |
|
||||
| `proxy_set_header Connection "upgrade"` | 传递连接升级指令 | ⭐⭐⭐ 必需 |
|
||||
| `rewrite ^/wsApi/(.*)$ /$1 break` | 路径重写 | ⭐⭐⭐ 必需 |
|
||||
|
||||
### 3.3 前端配置
|
||||
|
||||
#### 3.3.1 连接 URL 格式
|
||||
```javascript
|
||||
const wsUrl = `ws://nginx-ip:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${userId}&modelId=${modelId}`;
|
||||
```
|
||||
|
||||
**URL 组成解析**:
|
||||
```
|
||||
ws:// # WebSocket 协议
|
||||
192.168.65.57:3001 # Nginx 地址和端口
|
||||
/wsApi/ # Nginx WebSocket 代理前缀
|
||||
/simulation/data/ # 网关路由前缀
|
||||
/ws/data/modelTraining # Data 服务注册的 WebSocket 路径
|
||||
?userId=xxx&modelId=xxx # 业务参数
|
||||
```
|
||||
|
||||
#### 3.3.2 连接示例代码
|
||||
```javascript
|
||||
class ModelWebSocket {
|
||||
constructor(userId, modelId) {
|
||||
this.userId = userId;
|
||||
this.modelId = modelId;
|
||||
this.ws = null;
|
||||
this.reconnectTimer = null;
|
||||
this.heartbeatTimer = null;
|
||||
this.reconnectCount = 0;
|
||||
this.maxReconnectCount = 5;
|
||||
this.reconnectInterval = 3000;
|
||||
this.heartbeatInterval = 30000;
|
||||
}
|
||||
|
||||
// 建立连接
|
||||
connect() {
|
||||
const wsUrl = `ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${this.userId}&modelId=${this.modelId}`;
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(wsUrl);
|
||||
this.ws.onopen = this.onOpen.bind(this);
|
||||
this.ws.onmessage = this.onMessage.bind(this);
|
||||
this.ws.onclose = this.onClose.bind(this);
|
||||
this.ws.onerror = this.onError.bind(this);
|
||||
} catch (error) {
|
||||
console.error('WebSocket 连接异常:', error);
|
||||
this.reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// 连接成功
|
||||
onOpen(event) {
|
||||
console.log('WebSocket 连接成功', event);
|
||||
this.reconnectCount = 0;
|
||||
this.startHeartbeat();
|
||||
}
|
||||
|
||||
// 接收消息
|
||||
onMessage(event) {
|
||||
const data = JSON.parse(event.data);
|
||||
console.log('收到消息:', data);
|
||||
|
||||
// 根据消息类型处理
|
||||
switch (data.type) {
|
||||
case 'connection':
|
||||
console.log('连接确认,会话 ID:', data.sessionId);
|
||||
break;
|
||||
case 'modelDataProcessing':
|
||||
this.handleDataProcessing(data);
|
||||
break;
|
||||
case 'modelTrainingProcessing':
|
||||
this.handleTrainingProcessing(data);
|
||||
break;
|
||||
case 'modelPredictionResult':
|
||||
this.handlePredictionResult(data);
|
||||
break;
|
||||
default:
|
||||
console.warn('未知消息类型:', data.type);
|
||||
}
|
||||
}
|
||||
|
||||
// 连接关闭
|
||||
onClose(event) {
|
||||
console.log('WebSocket 连接关闭', event);
|
||||
this.stopHeartbeat();
|
||||
|
||||
// 非正常关闭时重连
|
||||
if (event.code !== 1000) {
|
||||
this.reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// 连接错误
|
||||
onError(error) {
|
||||
console.error('WebSocket 错误:', error);
|
||||
}
|
||||
|
||||
// 重连机制
|
||||
reconnect() {
|
||||
if (this.reconnectCount >= this.maxReconnectCount) {
|
||||
console.error('重连次数已达上限');
|
||||
return;
|
||||
}
|
||||
|
||||
this.reconnectCount++;
|
||||
console.log(`第 ${this.reconnectCount} 次重连...`);
|
||||
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.connect();
|
||||
}, this.reconnectInterval);
|
||||
}
|
||||
|
||||
// 心跳机制
|
||||
startHeartbeat() {
|
||||
this.heartbeatTimer = setInterval(() => {
|
||||
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
|
||||
}
|
||||
}, this.heartbeatInterval);
|
||||
}
|
||||
|
||||
stopHeartbeat() {
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
this.heartbeatTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
// 发送消息
|
||||
send(message) {
|
||||
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
this.ws.send(JSON.stringify(message));
|
||||
} else {
|
||||
console.warn('WebSocket 未连接,无法发送消息');
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
close() {
|
||||
this.reconnectTimer && clearTimeout(this.reconnectTimer);
|
||||
this.stopHeartbeat();
|
||||
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, '正常关闭');
|
||||
this.ws = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 使用示例
|
||||
const wsClient = new ModelWebSocket(userId, modelId);
|
||||
wsClient.connect();
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. 工作原理
|
||||
|
||||
### 4.1 连接建立流程
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant 前端
|
||||
participant Nginx
|
||||
participant Gateway
|
||||
participant Data
|
||||
participant Interceptor
|
||||
participant Service
|
||||
|
||||
前端->>Nginx: ws://nginx:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123
|
||||
Nginx->>Nginx: rewrite 去掉 /wsApi/
|
||||
Nginx->>Gateway: http://gateway:7100/simulation/data/ws/data/modelTraining?userId=1&modelId=123
|
||||
Gateway->>Gateway: 匹配路由 /simulation/data/**
|
||||
Gateway->>Gateway: StripPrefix=2 去除前缀
|
||||
Gateway->>Data: http://data:7104/ws/data/modelTraining?userId=1&modelId=123
|
||||
Data->>Interceptor: beforeHandshake()
|
||||
Interceptor->>Interceptor: 提取参数到 attributes
|
||||
Interceptor->>Data: 返回 true 允许握手
|
||||
Data->>Service: afterConnectionEstablished()
|
||||
Service->>Service: 存储 session 到 sessions map
|
||||
Service->>Service: 从 attributes 读取 userId, modelId
|
||||
Service->>Service: 建立 userModelSessionMap 映射
|
||||
Service->>前端:发送连接成功消息
|
||||
```
|
||||
|
||||
### 4.2 参数传递机制
|
||||
|
||||
#### 4.2.1 参数流转
|
||||
```
|
||||
URL 参数 → Interceptor → attributes → WebSocketSession → Service
|
||||
```
|
||||
|
||||
**详细说明**:
|
||||
|
||||
1. **URL 参数阶段**
|
||||
```
|
||||
/ws/data/modelTraining?userId=1&modelId=123
|
||||
```
|
||||
|
||||
2. **拦截器提取阶段**
|
||||
```java
|
||||
// WebSocketHandshakeInterceptor.java
|
||||
String userIdStr = servletRequest.getServletRequest().getParameter("userId");
|
||||
attributes.put("userId", Long.valueOf(userIdStr));
|
||||
```
|
||||
|
||||
3. **会话属性存储阶段**
|
||||
```java
|
||||
// WebSocketSession.attributes 中存储
|
||||
session.getAttributes().get("userId")
|
||||
```
|
||||
|
||||
4. **服务使用阶段**
|
||||
```java
|
||||
// WebSocketService.java
|
||||
Object userIdObj = session.getAttributes().get("userId");
|
||||
Long userId = (Long) userIdObj;
|
||||
```
|
||||
|
||||
#### 4.2.2 为什么使用 attributes?
|
||||
- ✅ **符合规范**: 遵循 WebSocket 参数传递规范
|
||||
- ✅ **类型安全**: 在拦截器中完成类型转换
|
||||
- ✅ **解耦设计**: Service 层不需要解析 URL 参数
|
||||
- ✅ **统一管理**: 所有会话参数集中管理
|
||||
|
||||
### 4.3 会话映射机制
|
||||
|
||||
#### 4.3.1 双 Map 结构
|
||||
```java
|
||||
// Map 1: sessionId -> WebSocketSession
|
||||
ConcurrentHashMap<String, WebSocketSession> sessions
|
||||
|
||||
// Map 2: userId_modelId -> sessionId
|
||||
ConcurrentHashMap<String, String> userModelSessionMap
|
||||
```
|
||||
|
||||
#### 4.3.2 映射关系建立
|
||||
```java
|
||||
// 连接建立时
|
||||
String key = userId + "_" + modelId; // 复合键
|
||||
userModelSessionMap.put(key, session.getId());
|
||||
sessions.put(session.getId(), session);
|
||||
```
|
||||
|
||||
#### 4.3.3 会话定位流程
|
||||
```mermaid
|
||||
graph LR
|
||||
A[需要发送消息] --> B{目标是谁?}
|
||||
B -->|特定用户模型 | C[构建复合键 userId_modelId]
|
||||
C --> D[查询 userModelSessionMap]
|
||||
D --> E[获取 sessionId]
|
||||
E --> F[查询 sessions]
|
||||
F --> G[获取 WebSocketSession]
|
||||
G --> H[发送消息]
|
||||
```
|
||||
|
||||
### 4.4 消息推送机制
|
||||
|
||||
#### 4.4.1 业务通知封装
|
||||
```java
|
||||
// 发送数据处理完成通知
|
||||
public void sendDataProcessingNotification(Long userId, Long modelId,
|
||||
boolean success, String message) {
|
||||
JSONObject notification = new JSONObject();
|
||||
notification.put("type", "modelDataProcessing");
|
||||
notification.put("modelId", modelId);
|
||||
notification.put("success", success);
|
||||
notification.put("message", message);
|
||||
|
||||
sendToUserModel(userId, modelId, notification.toJSONString());
|
||||
}
|
||||
```
|
||||
|
||||
#### 4.4.2 消息格式规范
|
||||
```json
|
||||
{
|
||||
"type": "modelDataProcessing",
|
||||
"modelId": 123,
|
||||
"success": true,
|
||||
"message": "数据处理完成"
|
||||
}
|
||||
```
|
||||
|
||||
**消息类型枚举**:
|
||||
| type | 说明 | 触发时机 |
|
||||
|------|------|----------|
|
||||
| `connection` | 连接确认 | 连接建立成功 |
|
||||
| `modelDataProcessing` | 数据处理通知 | 模型数据处理完成 |
|
||||
| `modelTrainingProcessing` | 训练处理通知 | 训练任务完成 |
|
||||
| `modelPredictionResult` | 预测结果通知 | 预测任务完成 |
|
||||
|
||||
---
|
||||
|
||||
## 5. 交互流程
|
||||
|
||||
### 5.1 完整生命周期
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant 前端
|
||||
participant Nginx
|
||||
participant Gateway
|
||||
participant Data
|
||||
participant 业务代码
|
||||
|
||||
前端->>Nginx: 1. WebSocket 连接请求
|
||||
Nginx->>Gateway: 2. 转发(路径重写后)
|
||||
Gateway->>Data: 3. 路由转发(去除前缀)
|
||||
Data->>Data: 4. 拦截器提取参数
|
||||
Data->>Data: 5. 建立 WebSocket 连接
|
||||
Data->>前端:6. 返回连接成功消息
|
||||
|
||||
Note over 前端,Data: 连接已建立,等待业务事件
|
||||
|
||||
业务代码->>Data: 7. 调用 sendXxxNotification()
|
||||
Data->>Data: 8. 查找目标会话
|
||||
Data->>前端:9. 推送业务通知消息
|
||||
|
||||
Note over 前端,Data: 业务处理中...
|
||||
|
||||
前端->>Data: 10. 发送心跳/业务消息
|
||||
Data->>Data: 11. handleTextMessage()
|
||||
|
||||
Note over 前端,Data: 页面关闭或网络断开
|
||||
|
||||
前端->>Data: 12. 关闭连接
|
||||
Data->>Data: 13. afterConnectionClosed()
|
||||
Data->>Data: 14. 清理会话映射
|
||||
```
|
||||
|
||||
### 5.2 典型业务场景
|
||||
|
||||
#### 场景 1: 模型数据处理完成
|
||||
```java
|
||||
// 业务代码调用
|
||||
@Autowired
|
||||
private WebSocketService webSocketService;
|
||||
|
||||
public void processDataModel(Long userId, Long modelId) {
|
||||
try {
|
||||
// 1. 执行业务逻辑
|
||||
// ... 数据处理代码 ...
|
||||
|
||||
// 2. 处理完成后推送通知
|
||||
webSocketService.sendDataProcessingNotification(
|
||||
userId,
|
||||
modelId,
|
||||
true,
|
||||
"数据处理成功"
|
||||
);
|
||||
} catch (Exception e) {
|
||||
// 3. 处理失败推送错误通知
|
||||
webSocketService.sendDataProcessingNotification(
|
||||
userId,
|
||||
modelId,
|
||||
false,
|
||||
"数据处理失败:" + e.getMessage()
|
||||
);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 场景 2: 模型训练任务完成
|
||||
```java
|
||||
public void onTrainingComplete(Long userId, Long modelId, TrainingResult result) {
|
||||
boolean success = result.isSuccess();
|
||||
String message = success ? "训练完成" : "训练失败";
|
||||
|
||||
webSocketService.sendTrainingProcessingNotification(
|
||||
userId,
|
||||
modelId,
|
||||
success,
|
||||
message
|
||||
);
|
||||
}
|
||||
```
|
||||
|
||||
#### 场景 3: 预测结果返回
|
||||
```java
|
||||
public void onPredictionComplete(Long userId, Long modelId, PredictionResult result) {
|
||||
JSONObject notification = new JSONObject();
|
||||
notification.put("type", "modelPredictionResult");
|
||||
notification.put("modelId", modelId);
|
||||
notification.put("success", result.isSuccess());
|
||||
notification.put("data", result.getData());
|
||||
notification.put("message", result.getMessage());
|
||||
|
||||
webSocketService.sendToUserModel(userId, modelId, notification.toJSONString());
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. 使用示例
|
||||
|
||||
### 6.1 后端调用示例
|
||||
|
||||
#### 示例 1: 在 Service 中推送通知
|
||||
```java
|
||||
@Service
|
||||
public class ModelTrainingService {
|
||||
|
||||
@Autowired
|
||||
private WebSocketService webSocketService;
|
||||
|
||||
@Async
|
||||
public void trainModel(Long userId, Long modelId, TrainingParams params) {
|
||||
try {
|
||||
log.info("开始训练模型,userId={}, modelId={}", userId, modelId);
|
||||
|
||||
// 执行训练逻辑
|
||||
TrainingResult result = executeTraining(params);
|
||||
|
||||
// 推送成功通知
|
||||
webSocketService.sendTrainingProcessingNotification(
|
||||
userId,
|
||||
modelId,
|
||||
true,
|
||||
"模型训练完成,准确率:" + result.getAccuracy()
|
||||
);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("模型训练失败", e);
|
||||
|
||||
// 推送失败通知
|
||||
webSocketService.sendTrainingProcessingNotification(
|
||||
userId,
|
||||
modelId,
|
||||
false,
|
||||
"训练失败:" + e.getMessage()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 示例 2: 在 Controller 中触发
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/data/model")
|
||||
public class ModelController {
|
||||
|
||||
@Autowired
|
||||
private WebSocketService webSocketService;
|
||||
|
||||
@PostMapping("/predict")
|
||||
public R<PredictionResult> predict(@RequestBody PredictionRequest request) {
|
||||
Long userId = request.getUserId();
|
||||
Long modelId = request.getModelId();
|
||||
|
||||
try {
|
||||
// 执行预测
|
||||
PredictionResult result = predictionService.predict(request);
|
||||
|
||||
// 推送预测结果
|
||||
webSocketService.sendPredictionResultNotification(
|
||||
userId,
|
||||
modelId,
|
||||
true,
|
||||
"预测完成"
|
||||
);
|
||||
|
||||
return R.ok(result);
|
||||
} catch (Exception e) {
|
||||
// 推送错误通知
|
||||
webSocketService.sendPredictionResultNotification(
|
||||
userId,
|
||||
modelId,
|
||||
false,
|
||||
"预测失败:" + e.getMessage()
|
||||
);
|
||||
|
||||
return R.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 6.2 前端使用示例
|
||||
|
||||
#### Vue 3 示例
|
||||
```vue
|
||||
<template>
|
||||
<div>
|
||||
<div v-if="isConnected">✅ 已连接</div>
|
||||
<div v-else>❌ 未连接</div>
|
||||
|
||||
<div v-if="notifications.length">
|
||||
<h3>通知消息</h3>
|
||||
<ul>
|
||||
<li v-for="msg in notifications" :key="msg.id">
|
||||
{{ msg.message }}
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
import { ref, onMounted, onUnmounted } from 'vue';
|
||||
|
||||
const isConnected = ref(false);
|
||||
const notifications = ref([]);
|
||||
let wsClient = null;
|
||||
|
||||
onMounted(() => {
|
||||
const userId = 1979078323595476993;
|
||||
const modelId = 66;
|
||||
|
||||
wsClient = new ModelWebSocket(userId, modelId, {
|
||||
onConnect: () => {
|
||||
isConnected.value = true;
|
||||
},
|
||||
onDisconnect: () => {
|
||||
isConnected.value = false;
|
||||
},
|
||||
onMessage: (data) => {
|
||||
notifications.value.unshift({
|
||||
id: Date.now(),
|
||||
...data
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
wsClient.connect();
|
||||
});
|
||||
|
||||
onUnmounted(() => {
|
||||
wsClient?.close();
|
||||
});
|
||||
</script>
|
||||
```
|
||||
|
||||
#### React 示例
|
||||
```jsx
|
||||
import { useEffect, useState } from 'react';
|
||||
|
||||
function ModelNotification({ userId, modelId }) {
|
||||
const [isConnected, setIsConnected] = useState(false);
|
||||
const [notifications, setNotifications] = useState([]);
|
||||
|
||||
useEffect(() => {
|
||||
const wsClient = new ModelWebSocket(userId, modelId, {
|
||||
onConnect: () => setIsConnected(true),
|
||||
onDisconnect: () => setIsConnected(false),
|
||||
onMessage: (data) => {
|
||||
setNotifications(prev => [{
|
||||
id: Date.now(),
|
||||
...data
|
||||
}, ...prev]);
|
||||
}
|
||||
});
|
||||
|
||||
wsClient.connect();
|
||||
|
||||
return () => {
|
||||
wsClient.close();
|
||||
};
|
||||
}, [userId, modelId]);
|
||||
|
||||
return (
|
||||
<div>
|
||||
{isConnected ? <span>✅ 已连接</span> : <span>❌ 未连接</span>}
|
||||
<ul>
|
||||
{notifications.map(msg => (
|
||||
<li key={msg.id}>{msg.message}</li>
|
||||
))}
|
||||
</ul>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. 常见问题排查
|
||||
|
||||
### 7.1 连接问题
|
||||
|
||||
#### 问题 1: 断点不触发
|
||||
**现象**: 在 `WebSocketHandshakeInterceptor` 打断点,但不断入
|
||||
|
||||
**排查步骤**:
|
||||
1. ✅ 确认 Nginx 监听端口与前端连接端口一致(3001)
|
||||
2. ✅ 确认 Nginx 配置已重载:`nginx -s reload`
|
||||
3. ✅ 确认网关在 `192.168.65.161:7100` 运行
|
||||
4. ✅ 确认 Data 服务已注册到 Nacos
|
||||
5. ✅ 检查前端连接 URL 是否正确
|
||||
|
||||
**验证命令**:
|
||||
```bash
|
||||
# 检查 Nginx 端口
|
||||
netstat -an | grep 3001
|
||||
|
||||
# 检查网关端口
|
||||
netstat -an | grep 7100
|
||||
|
||||
# 检查 Data 服务端口
|
||||
netstat -an | grep 7104
|
||||
|
||||
# 测试 WebSocket 连接
|
||||
wscat -c "ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123"
|
||||
```
|
||||
|
||||
#### 问题 2: 连接后立即断开
|
||||
**可能原因**:
|
||||
- ❌ Nginx 未配置 WebSocket 头部
|
||||
- ❌ 超时时间设置过短
|
||||
- ❌ 防火墙拦截
|
||||
|
||||
**解决方案**:
|
||||
```nginx
|
||||
location /wsApi/ {
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Upgrade $http_upgrade;
|
||||
proxy_set_header Connection "upgrade";
|
||||
proxy_read_timeout 86400;
|
||||
proxy_send_timeout 86400;
|
||||
}
|
||||
```
|
||||
|
||||
### 7.2 消息推送问题
|
||||
|
||||
#### 问题 3: 消息发送失败
|
||||
**错误日志**:
|
||||
```
|
||||
WARN 未找到用户 123 的模型 456 的 WebSocket 会话
|
||||
```
|
||||
|
||||
**原因**:
|
||||
- 用户未连接或已断开连接
|
||||
- userId 或 modelId 不匹配
|
||||
|
||||
**解决方案**:
|
||||
1. 前端检查连接 URL 中的参数是否正确
|
||||
2. 后端检查发送消息时使用的 userId/modelId 是否正确
|
||||
3. 在 `WebSocketService` 中添加日志查看映射关系
|
||||
|
||||
#### 问题 4: 消息发送异常
|
||||
**错误日志**:
|
||||
```
|
||||
ERROR 发送 WebSocket 消息失败,会话 ID: xxx
|
||||
```
|
||||
|
||||
**原因**:
|
||||
- 会话已关闭但仍尝试发送
|
||||
|
||||
**解决方案**:
|
||||
代码已处理,会自动检查会话状态:
|
||||
```java
|
||||
if (session.isOpen()) {
|
||||
session.sendMessage(new TextMessage(message));
|
||||
}
|
||||
```
|
||||
|
||||
### 7.3 跨域问题
|
||||
|
||||
#### 问题 5: 跨域错误
|
||||
**浏览器错误**:
|
||||
```
|
||||
WebSocket connection to 'ws://...' failed:
|
||||
Error during WebSocket handshake:
|
||||
Unexpected response code: 200
|
||||
```
|
||||
|
||||
**解决方案**:
|
||||
```java
|
||||
// WebSocketConfig.java
|
||||
registry.addHandler(webSocketService, "/ws/data/modelTraining")
|
||||
.setAllowedOrigins("*");
|
||||
```
|
||||
|
||||
### 7.4 并发问题
|
||||
|
||||
#### 问题 6: 并发修改异常
|
||||
**错误**: `ConcurrentModificationException`
|
||||
|
||||
**原因**: 多线程同时修改 Map
|
||||
|
||||
**解决方案**:
|
||||
已使用 `ConcurrentHashMap`,线程安全:
|
||||
```java
|
||||
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
|
||||
```
|
||||
|
||||
### 7.5 内存泄漏问题
|
||||
|
||||
#### 问题 7: 会话未清理
|
||||
**现象**: 运行一段时间后 OOM
|
||||
|
||||
**排查**:
|
||||
1. 检查 `afterConnectionClosed` 是否清理映射
|
||||
2. 检查是否有重复注册
|
||||
|
||||
**当前实现**:
|
||||
```java
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
|
||||
String sessionId = session.getId();
|
||||
sessions.remove(sessionId);
|
||||
userModelSessionMap.values().removeIf(id -> id.equals(sessionId));
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 附录
|
||||
|
||||
### A. 相关文件清单
|
||||
```
|
||||
data/
|
||||
├── src/main/java/com/sdm/data/
|
||||
│ ├── config/
|
||||
│ │ ├── WebSocketConfig.java # WebSocket 配置类
|
||||
│ │ └── WebSocketHandshakeInterceptor.java # 握手拦截器
|
||||
│ └── service/
|
||||
│ └── WebSocketService.java # WebSocket 服务
|
||||
└── src/main/resources/
|
||||
└── application-local.yml # Data 服务配置
|
||||
```
|
||||
|
||||
### B. 依赖配置
|
||||
```xml
|
||||
<!-- pom.xml -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.fastjson2</groupId>
|
||||
<artifactId>fastjson2</artifactId>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
### C. 测试工具
|
||||
|
||||
#### wscat 安装
|
||||
```bash
|
||||
npm install -g wscat
|
||||
```
|
||||
|
||||
#### 测试命令
|
||||
```bash
|
||||
# 测试连接
|
||||
wscat -c "ws://localhost:7104/ws/data/modelTraining?userId=1&modelId=123"
|
||||
|
||||
# 发送消息
|
||||
> {"type": "ping"}
|
||||
```
|
||||
|
||||
### D. 参考资料
|
||||
- [Spring WebSocket 官方文档](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket)
|
||||
- [WebSocket RFC 6455](https://datatracker.ietf.org/doc/html/rfc6455)
|
||||
- [Nginx WebSocket 代理配置](https://www.nginx.com/blog/websocket-nginx/)
|
||||
|
||||
---
|
||||
|
||||
## 更新日志
|
||||
|
||||
| 日期 | 版本 | 说明 | 作者 |
|
||||
|------|------|------|------|
|
||||
| 2026-03-06 | v1.0 | 初始版本 | Gulongcheng |
|
||||
@@ -302,8 +302,11 @@ public class ModelServiceImpl implements IModelService {
|
||||
* @param modelId 模型ID
|
||||
*/
|
||||
private void processDataAsync(String pythonScriptPath, String paramJsonPath, Long trainingModelId, Long userId, Long modelId) {
|
||||
Long tenantId = ThreadLocalContext.getTenantId();
|
||||
new Thread(() -> {
|
||||
try {
|
||||
ThreadLocalContext.setTenantId(tenantId);
|
||||
ThreadLocalContext.setUserId(userId);
|
||||
// 调用Python脚本处理数据
|
||||
long startTime = System.currentTimeMillis();
|
||||
log.info("开始执行Python脚本, 训练模型ID: {}", trainingModelId);
|
||||
@@ -757,7 +760,10 @@ public class ModelServiceImpl implements IModelService {
|
||||
* @param userId 用户ID
|
||||
*/
|
||||
private void trainModelAsync(String paramJsonPath, Long modelId, Long userId, String exportFormat) {
|
||||
Long tenantId = ThreadLocalContext.getTenantId();
|
||||
new Thread(() -> {
|
||||
ThreadLocalContext.setTenantId(tenantId);
|
||||
ThreadLocalContext.setUserId(userId);
|
||||
Process process = null;
|
||||
// 用于实时读取日志的线程
|
||||
Thread logReaderThread = null;
|
||||
|
||||
@@ -8,9 +8,9 @@
|
||||
<h1>WebSocket 测试页面</h1>
|
||||
<div>
|
||||
<label for="userId">用户ID:</label>
|
||||
<input type="number" id="userId" value="1001" />
|
||||
<input type="number" id="userId" value="1980235559149838337" />
|
||||
<label for="modelId">模型ID:</label>
|
||||
<input type="number" id="modelId" value="2025" />
|
||||
<input type="number" id="modelId" value="68" />
|
||||
<button onclick="connect()">连接</button>
|
||||
<button onclick="sendPing()">发送 Ping</button>
|
||||
<button onclick="sendSubscribe()">发送订阅</button>
|
||||
|
||||
Reference in New Issue
Block a user