25 KiB
25 KiB
WebSocket 配置与原理说明文档
目录
1. 概述
1.1 功能定位
Data 模块的 WebSocket 功能主要用于训练模型实时通知,支持以下场景:
- ✅ 模型数据处理完成通知
- ✅ 模型训练进度通知
- ✅ 模型预测结果通知
1.2 技术栈
- 框架: Spring WebSocket
- 协议: WebSocket (RFC 6455)
- 消息格式: JSON (FastJSON2)
- 并发处理: ConcurrentHashMap 线程安全
1.3 架构位置
前端页面 → Nginx(3001) → Gateway(7100) → Data Service(7104) → WebSocket 推送
2. 核心组件
2.1 组件关系图
graph TB
A[WebSocketConfig] -->|注册 | B[WebSocketService]
C[WebSocketHandshakeInterceptor] -->|拦截 | B
B -->|管理 | D[WebSocketSession]
2.2 组件职责
2.2.1 WebSocketConfig
文件路径: data/src/main/java/com/sdm/data/config/WebSocketConfig.java
职责:
- 注册 WebSocket 处理器
- 配置握手拦截器
- 设置跨域访问规则
核心代码:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketService, "/ws/data/modelTraining")
.addInterceptors(webSocketHandshakeInterceptor)
.setAllowedOrigins("*");
}
}
2.2.2 WebSocketHandshakeInterceptor
文件路径: data/src/main/java/com/sdm/data/config/WebSocketHandshakeInterceptor.java
职责:
- 在握手前提取请求参数(userId, modelId)
- 将参数存储到会话属性(attributes)中
- 验证参数有效性
关键方法:
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 1. 从请求参数中获取 userId 和 modelId
String userIdStr = servletRequest.getServletRequest().getParameter("userId");
String modelIdStr = servletRequest.getServletRequest().getParameter("modelId");
// 2. 类型转换并存储到 attributes
attributes.put("userId", Long.valueOf(userIdStr));
attributes.put("modelId", Long.valueOf(modelIdStr));
return true; // 允许握手
}
2.2.3 WebSocketService
文件路径: data/src/main/java/com/sdm/data/service/WebSocketService.java
职责:
- 管理所有活跃的 WebSocket 会话
- 建立用户模型与会话的映射关系
- 提供消息发送方法(广播、单播、业务通知)
核心数据结构:
// 存储所有活跃的 WebSocket 会话
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
// 存储用户模型 ID 与会话 ID 的映射关系 (userId_modelId -> sessionId)
private final ConcurrentHashMap<String, String> userModelSessionMap = new ConcurrentHashMap<>();
主要方法:
| 方法名 | 功能 | 参数 |
|---|---|---|
sendToAll() |
广播消息给所有客户端 | message |
sendToSession() |
向指定会话发送消息 | sessionId, message |
sendToUserModel() |
向特定用户的特定模型发送消息 | userId, modelId, message |
sendDataProcessingNotification() |
发送数据处理完成通知 | userId, modelId, success, message |
sendTrainingProcessingNotification() |
发送训练处理通知 | userId, modelId, success, message |
sendPredictionResultNotification() |
发送预测结果通知 | userId, modelId, success, message |
3. 配置详解
3.1 后端配置
3.1.1 Data 服务配置
文件: data/src/main/resources/application-local.yml
server:
port: 7104 # Data 服务端口
spring:
application:
name: data # 服务名(用于 Nacos 注册)
3.1.2 网关配置
文件: gateway2/src/main/resources/application-local.yml
server:
port: 7100 # 网关端口
spring:
cloud:
gateway:
routes:
- id: data-service
uri: lb://data # 负载均衡到 data 服务
predicates:
- Path=/simulation/data/** # 路径匹配
filters:
- StripPrefix=2 # 去除 2 级前缀
路径转换说明:
前端请求:/simulation/data/ws/data/modelTraining
网关处理:去除 /simulation/data (2 级)
转发到 Data: /ws/data/modelTraining
3.2 Nginx 配置
文件: /usr/local/nginx/conf/nginx.conf 或独立配置文件
server {
listen 3001; # Nginx 监听端口
server_name localhost;
# WebSocket 代理配置
location /wsApi/ {
# 路径重写:去掉 /wsApi/ 前缀
rewrite ^/wsApi/(.*)$ /$1 break;
# 转发到网关
proxy_pass http://192.168.65.161:7100;
# ========== WebSocket 必需配置 ==========
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 超时时间设置(长连接)
proxy_read_timeout 86400;
proxy_send_timeout 86400;
}
}
配置说明:
| 配置项 | 作用 | 必要性 |
|---|---|---|
proxy_http_version 1.1 |
使用 HTTP/1.1 协议 | ⭐⭐⭐ 必需 |
proxy_set_header Upgrade $http_upgrade |
传递协议升级头 | ⭐⭐⭐ 必需 |
proxy_set_header Connection "upgrade" |
传递连接升级指令 | ⭐⭐⭐ 必需 |
rewrite ^/wsApi/(.*)$ /$1 break |
路径重写 | ⭐⭐⭐ 必需 |
3.3 前端配置
3.3.1 连接 URL 格式
const wsUrl = `ws://nginx-ip:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${userId}&modelId=${modelId}`;
URL 组成解析:
ws:// # WebSocket 协议
192.168.65.57:3001 # Nginx 地址和端口
/wsApi/ # Nginx WebSocket 代理前缀
/simulation/data/ # 网关路由前缀
/ws/data/modelTraining # Data 服务注册的 WebSocket 路径
?userId=xxx&modelId=xxx # 业务参数
3.3.2 连接示例代码
class ModelWebSocket {
constructor(userId, modelId) {
this.userId = userId;
this.modelId = modelId;
this.ws = null;
this.reconnectTimer = null;
this.heartbeatTimer = null;
this.reconnectCount = 0;
this.maxReconnectCount = 5;
this.reconnectInterval = 3000;
this.heartbeatInterval = 30000;
}
// 建立连接
connect() {
const wsUrl = `ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${this.userId}&modelId=${this.modelId}`;
try {
this.ws = new WebSocket(wsUrl);
this.ws.onopen = this.onOpen.bind(this);
this.ws.onmessage = this.onMessage.bind(this);
this.ws.onclose = this.onClose.bind(this);
this.ws.onerror = this.onError.bind(this);
} catch (error) {
console.error('WebSocket 连接异常:', error);
this.reconnect();
}
}
// 连接成功
onOpen(event) {
console.log('WebSocket 连接成功', event);
this.reconnectCount = 0;
this.startHeartbeat();
}
// 接收消息
onMessage(event) {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
// 根据消息类型处理
switch (data.type) {
case 'connection':
console.log('连接确认,会话 ID:', data.sessionId);
break;
case 'modelDataProcessing':
this.handleDataProcessing(data);
break;
case 'modelTrainingProcessing':
this.handleTrainingProcessing(data);
break;
case 'modelPredictionResult':
this.handlePredictionResult(data);
break;
default:
console.warn('未知消息类型:', data.type);
}
}
// 连接关闭
onClose(event) {
console.log('WebSocket 连接关闭', event);
this.stopHeartbeat();
// 非正常关闭时重连
if (event.code !== 1000) {
this.reconnect();
}
}
// 连接错误
onError(error) {
console.error('WebSocket 错误:', error);
}
// 重连机制
reconnect() {
if (this.reconnectCount >= this.maxReconnectCount) {
console.error('重连次数已达上限');
return;
}
this.reconnectCount++;
console.log(`第 ${this.reconnectCount} 次重连...`);
this.reconnectTimer = setTimeout(() => {
this.connect();
}, this.reconnectInterval);
}
// 心跳机制
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
}
}, this.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
// 发送消息
send(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
console.warn('WebSocket 未连接,无法发送消息');
}
}
// 关闭连接
close() {
this.reconnectTimer && clearTimeout(this.reconnectTimer);
this.stopHeartbeat();
if (this.ws) {
this.ws.close(1000, '正常关闭');
this.ws = null;
}
}
}
// 使用示例
const wsClient = new ModelWebSocket(userId, modelId);
wsClient.connect();
4. 工作原理
4.1 连接建立流程
sequenceDiagram
participant 前端
participant Nginx
participant Gateway
participant Data
participant Interceptor
participant Service
前端->>Nginx: ws://nginx:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123
Nginx->>Nginx: rewrite 去掉 /wsApi/
Nginx->>Gateway: http://gateway:7100/simulation/data/ws/data/modelTraining?userId=1&modelId=123
Gateway->>Gateway: 匹配路由 /simulation/data/**
Gateway->>Gateway: StripPrefix=2 去除前缀
Gateway->>Data: http://data:7104/ws/data/modelTraining?userId=1&modelId=123
Data->>Interceptor: beforeHandshake()
Interceptor->>Interceptor: 提取参数到 attributes
Interceptor->>Data: 返回 true 允许握手
Data->>Service: afterConnectionEstablished()
Service->>Service: 存储 session 到 sessions map
Service->>Service: 从 attributes 读取 userId, modelId
Service->>Service: 建立 userModelSessionMap 映射
Service->>前端:发送连接成功消息
4.2 参数传递机制
4.2.1 参数流转
URL 参数 → Interceptor → attributes → WebSocketSession → Service
详细说明:
-
URL 参数阶段
/ws/data/modelTraining?userId=1&modelId=123 -
拦截器提取阶段
// WebSocketHandshakeInterceptor.java String userIdStr = servletRequest.getServletRequest().getParameter("userId"); attributes.put("userId", Long.valueOf(userIdStr)); -
会话属性存储阶段
// WebSocketSession.attributes 中存储 session.getAttributes().get("userId") -
服务使用阶段
// WebSocketService.java Object userIdObj = session.getAttributes().get("userId"); Long userId = (Long) userIdObj;
4.2.2 为什么使用 attributes?
- ✅ 符合规范: 遵循 WebSocket 参数传递规范
- ✅ 类型安全: 在拦截器中完成类型转换
- ✅ 解耦设计: Service 层不需要解析 URL 参数
- ✅ 统一管理: 所有会话参数集中管理
4.3 会话映射机制
4.3.1 双 Map 结构
// Map 1: sessionId -> WebSocketSession
ConcurrentHashMap<String, WebSocketSession> sessions
// Map 2: userId_modelId -> sessionId
ConcurrentHashMap<String, String> userModelSessionMap
4.3.2 映射关系建立
// 连接建立时
String key = userId + "_" + modelId; // 复合键
userModelSessionMap.put(key, session.getId());
sessions.put(session.getId(), session);
4.3.3 会话定位流程
graph LR
A[需要发送消息] --> B{目标是谁?}
B -->|特定用户模型 | C[构建复合键 userId_modelId]
C --> D[查询 userModelSessionMap]
D --> E[获取 sessionId]
E --> F[查询 sessions]
F --> G[获取 WebSocketSession]
G --> H[发送消息]
4.4 消息推送机制
4.4.1 业务通知封装
// 发送数据处理完成通知
public void sendDataProcessingNotification(Long userId, Long modelId,
boolean success, String message) {
JSONObject notification = new JSONObject();
notification.put("type", "modelDataProcessing");
notification.put("modelId", modelId);
notification.put("success", success);
notification.put("message", message);
sendToUserModel(userId, modelId, notification.toJSONString());
}
4.4.2 消息格式规范
{
"type": "modelDataProcessing",
"modelId": 123,
"success": true,
"message": "数据处理完成"
}
消息类型枚举:
| type | 说明 | 触发时机 |
|---|---|---|
connection |
连接确认 | 连接建立成功 |
modelDataProcessing |
数据处理通知 | 模型数据处理完成 |
modelTrainingProcessing |
训练处理通知 | 训练任务完成 |
modelPredictionResult |
预测结果通知 | 预测任务完成 |
5. 交互流程
5.1 完整生命周期
sequenceDiagram
autonumber
participant 前端
participant Nginx
participant Gateway
participant Data
participant 业务代码
前端->>Nginx: 1. WebSocket 连接请求
Nginx->>Gateway: 2. 转发(路径重写后)
Gateway->>Data: 3. 路由转发(去除前缀)
Data->>Data: 4. 拦截器提取参数
Data->>Data: 5. 建立 WebSocket 连接
Data->>前端:6. 返回连接成功消息
Note over 前端,Data: 连接已建立,等待业务事件
业务代码->>Data: 7. 调用 sendXxxNotification()
Data->>Data: 8. 查找目标会话
Data->>前端:9. 推送业务通知消息
Note over 前端,Data: 业务处理中...
前端->>Data: 10. 发送心跳/业务消息
Data->>Data: 11. handleTextMessage()
Note over 前端,Data: 页面关闭或网络断开
前端->>Data: 12. 关闭连接
Data->>Data: 13. afterConnectionClosed()
Data->>Data: 14. 清理会话映射
5.2 典型业务场景
场景 1: 模型数据处理完成
// 业务代码调用
@Autowired
private WebSocketService webSocketService;
public void processDataModel(Long userId, Long modelId) {
try {
// 1. 执行业务逻辑
// ... 数据处理代码 ...
// 2. 处理完成后推送通知
webSocketService.sendDataProcessingNotification(
userId,
modelId,
true,
"数据处理成功"
);
} catch (Exception e) {
// 3. 处理失败推送错误通知
webSocketService.sendDataProcessingNotification(
userId,
modelId,
false,
"数据处理失败:" + e.getMessage()
);
}
}
场景 2: 模型训练任务完成
public void onTrainingComplete(Long userId, Long modelId, TrainingResult result) {
boolean success = result.isSuccess();
String message = success ? "训练完成" : "训练失败";
webSocketService.sendTrainingProcessingNotification(
userId,
modelId,
success,
message
);
}
场景 3: 预测结果返回
public void onPredictionComplete(Long userId, Long modelId, PredictionResult result) {
JSONObject notification = new JSONObject();
notification.put("type", "modelPredictionResult");
notification.put("modelId", modelId);
notification.put("success", result.isSuccess());
notification.put("data", result.getData());
notification.put("message", result.getMessage());
webSocketService.sendToUserModel(userId, modelId, notification.toJSONString());
}
6. 使用示例
6.1 后端调用示例
示例 1: 在 Service 中推送通知
@Service
public class ModelTrainingService {
@Autowired
private WebSocketService webSocketService;
@Async
public void trainModel(Long userId, Long modelId, TrainingParams params) {
try {
log.info("开始训练模型,userId={}, modelId={}", userId, modelId);
// 执行训练逻辑
TrainingResult result = executeTraining(params);
// 推送成功通知
webSocketService.sendTrainingProcessingNotification(
userId,
modelId,
true,
"模型训练完成,准确率:" + result.getAccuracy()
);
} catch (Exception e) {
log.error("模型训练失败", e);
// 推送失败通知
webSocketService.sendTrainingProcessingNotification(
userId,
modelId,
false,
"训练失败:" + e.getMessage()
);
}
}
}
示例 2: 在 Controller 中触发
@RestController
@RequestMapping("/data/model")
public class ModelController {
@Autowired
private WebSocketService webSocketService;
@PostMapping("/predict")
public R<PredictionResult> predict(@RequestBody PredictionRequest request) {
Long userId = request.getUserId();
Long modelId = request.getModelId();
try {
// 执行预测
PredictionResult result = predictionService.predict(request);
// 推送预测结果
webSocketService.sendPredictionResultNotification(
userId,
modelId,
true,
"预测完成"
);
return R.ok(result);
} catch (Exception e) {
// 推送错误通知
webSocketService.sendPredictionResultNotification(
userId,
modelId,
false,
"预测失败:" + e.getMessage()
);
return R.fail(e.getMessage());
}
}
}
6.2 前端使用示例
Vue 3 示例
<template>
<div>
<div v-if="isConnected">✅ 已连接</div>
<div v-else>❌ 未连接</div>
<div v-if="notifications.length">
<h3>通知消息</h3>
<ul>
<li v-for="msg in notifications" :key="msg.id">
{{ msg.message }}
</li>
</ul>
</div>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue';
const isConnected = ref(false);
const notifications = ref([]);
let wsClient = null;
onMounted(() => {
const userId = 1979078323595476993;
const modelId = 66;
wsClient = new ModelWebSocket(userId, modelId, {
onConnect: () => {
isConnected.value = true;
},
onDisconnect: () => {
isConnected.value = false;
},
onMessage: (data) => {
notifications.value.unshift({
id: Date.now(),
...data
});
}
});
wsClient.connect();
});
onUnmounted(() => {
wsClient?.close();
});
</script>
React 示例
import { useEffect, useState } from 'react';
function ModelNotification({ userId, modelId }) {
const [isConnected, setIsConnected] = useState(false);
const [notifications, setNotifications] = useState([]);
useEffect(() => {
const wsClient = new ModelWebSocket(userId, modelId, {
onConnect: () => setIsConnected(true),
onDisconnect: () => setIsConnected(false),
onMessage: (data) => {
setNotifications(prev => [{
id: Date.now(),
...data
}, ...prev]);
}
});
wsClient.connect();
return () => {
wsClient.close();
};
}, [userId, modelId]);
return (
<div>
{isConnected ? <span>✅ 已连接</span> : <span>❌ 未连接</span>}
<ul>
{notifications.map(msg => (
<li key={msg.id}>{msg.message}</li>
))}
</ul>
</div>
);
}
7. 常见问题排查
7.1 连接问题
问题 1: 断点不触发
现象: 在 WebSocketHandshakeInterceptor 打断点,但不断入
排查步骤:
- ✅ 确认 Nginx 监听端口与前端连接端口一致(3001)
- ✅ 确认 Nginx 配置已重载:
nginx -s reload - ✅ 确认网关在
192.168.65.161:7100运行 - ✅ 确认 Data 服务已注册到 Nacos
- ✅ 检查前端连接 URL 是否正确
验证命令:
# 检查 Nginx 端口
netstat -an | grep 3001
# 检查网关端口
netstat -an | grep 7100
# 检查 Data 服务端口
netstat -an | grep 7104
# 测试 WebSocket 连接
wscat -c "ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123"
问题 2: 连接后立即断开
可能原因:
- ❌ Nginx 未配置 WebSocket 头部
- ❌ 超时时间设置过短
- ❌ 防火墙拦截
解决方案:
location /wsApi/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400;
proxy_send_timeout 86400;
}
7.2 消息推送问题
问题 3: 消息发送失败
错误日志:
WARN 未找到用户 123 的模型 456 的 WebSocket 会话
原因:
- 用户未连接或已断开连接
- userId 或 modelId 不匹配
解决方案:
- 前端检查连接 URL 中的参数是否正确
- 后端检查发送消息时使用的 userId/modelId 是否正确
- 在
WebSocketService中添加日志查看映射关系
问题 4: 消息发送异常
错误日志:
ERROR 发送 WebSocket 消息失败,会话 ID: xxx
原因:
- 会话已关闭但仍尝试发送
解决方案: 代码已处理,会自动检查会话状态:
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
7.3 跨域问题
问题 5: 跨域错误
浏览器错误:
WebSocket connection to 'ws://...' failed:
Error during WebSocket handshake:
Unexpected response code: 200
解决方案:
// WebSocketConfig.java
registry.addHandler(webSocketService, "/ws/data/modelTraining")
.setAllowedOrigins("*");
7.4 并发问题
问题 6: 并发修改异常
错误: ConcurrentModificationException
原因: 多线程同时修改 Map
解决方案:
已使用 ConcurrentHashMap,线程安全:
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
7.5 内存泄漏问题
问题 7: 会话未清理
现象: 运行一段时间后 OOM
排查:
- 检查
afterConnectionClosed是否清理映射 - 检查是否有重复注册
当前实现:
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
sessions.remove(sessionId);
userModelSessionMap.values().removeIf(id -> id.equals(sessionId));
}
附录
A. 相关文件清单
data/
├── src/main/java/com/sdm/data/
│ ├── config/
│ │ ├── WebSocketConfig.java # WebSocket 配置类
│ │ └── WebSocketHandshakeInterceptor.java # 握手拦截器
│ └── service/
│ └── WebSocketService.java # WebSocket 服务
└── src/main/resources/
└── application-local.yml # Data 服务配置
B. 依赖配置
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
C. 测试工具
wscat 安装
npm install -g wscat
测试命令
# 测试连接
wscat -c "ws://localhost:7104/ws/data/modelTraining?userId=1&modelId=123"
# 发送消息
> {"type": "ping"}
D. 参考资料
更新日志
| 日期 | 版本 | 说明 | 作者 |
|---|---|---|---|
| 2026-03-06 | v1.0 | 初始版本 | Gulongcheng |