Files
spdm-backend/data/WebSocket 配置与原理说明.md

25 KiB
Raw Permalink Blame History

WebSocket 配置与原理说明文档

目录


1. 概述

1.1 功能定位

Data 模块的 WebSocket 功能主要用于训练模型实时通知,支持以下场景:

  • 模型数据处理完成通知
  • 模型训练进度通知
  • 模型预测结果通知

1.2 技术栈

  • 框架: Spring WebSocket
  • 协议: WebSocket (RFC 6455)
  • 消息格式: JSON (FastJSON2)
  • 并发处理: ConcurrentHashMap 线程安全

1.3 架构位置

前端页面 → Nginx(3001) → Gateway(7100) → Data Service(7104) → WebSocket 推送

2. 核心组件

2.1 组件关系图

graph TB
    A[WebSocketConfig] -->|注册 | B[WebSocketService]
    C[WebSocketHandshakeInterceptor] -->|拦截 | B
    B -->|管理 | D[WebSocketSession]

2.2 组件职责

2.2.1 WebSocketConfig

文件路径: data/src/main/java/com/sdm/data/config/WebSocketConfig.java

职责:

  • 注册 WebSocket 处理器
  • 配置握手拦截器
  • 设置跨域访问规则

核心代码:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketService, "/ws/data/modelTraining")
                .addInterceptors(webSocketHandshakeInterceptor)
                .setAllowedOrigins("*");
    }
}

2.2.2 WebSocketHandshakeInterceptor

文件路径: data/src/main/java/com/sdm/data/config/WebSocketHandshakeInterceptor.java

职责:

  • 在握手前提取请求参数userId, modelId
  • 将参数存储到会话属性attributes
  • 验证参数有效性

关键方法:

@Override
public boolean beforeHandshake(ServerHttpRequest request, 
                               ServerHttpResponse response,
                               WebSocketHandler wsHandler, 
                               Map<String, Object> attributes) {
    // 1. 从请求参数中获取 userId 和 modelId
    String userIdStr = servletRequest.getServletRequest().getParameter("userId");
    String modelIdStr = servletRequest.getServletRequest().getParameter("modelId");
    
    // 2. 类型转换并存储到 attributes
    attributes.put("userId", Long.valueOf(userIdStr));
    attributes.put("modelId", Long.valueOf(modelIdStr));
    
    return true; // 允许握手
}

2.2.3 WebSocketService

文件路径: data/src/main/java/com/sdm/data/service/WebSocketService.java

职责:

  • 管理所有活跃的 WebSocket 会话
  • 建立用户模型与会话的映射关系
  • 提供消息发送方法(广播、单播、业务通知)

核心数据结构:

// 存储所有活跃的 WebSocket 会话
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

// 存储用户模型 ID 与会话 ID 的映射关系 (userId_modelId -> sessionId)
private final ConcurrentHashMap<String, String> userModelSessionMap = new ConcurrentHashMap<>();

主要方法:

方法名 功能 参数
sendToAll() 广播消息给所有客户端 message
sendToSession() 向指定会话发送消息 sessionId, message
sendToUserModel() 向特定用户的特定模型发送消息 userId, modelId, message
sendDataProcessingNotification() 发送数据处理完成通知 userId, modelId, success, message
sendTrainingProcessingNotification() 发送训练处理通知 userId, modelId, success, message
sendPredictionResultNotification() 发送预测结果通知 userId, modelId, success, message

3. 配置详解

3.1 后端配置

3.1.1 Data 服务配置

文件: data/src/main/resources/application-local.yml

server:
  port: 7104  # Data 服务端口

spring:
  application:
    name: data  # 服务名(用于 Nacos 注册)

3.1.2 网关配置

文件: gateway2/src/main/resources/application-local.yml

server:
  port: 7100  # 网关端口

spring:
  cloud:
    gateway:
      routes:
        - id: data-service
          uri: lb://data  # 负载均衡到 data 服务
          predicates:
            - Path=/simulation/data/**  # 路径匹配
          filters:
            - StripPrefix=2  # 去除 2 级前缀

路径转换说明:

前端请求:/simulation/data/ws/data/modelTraining
网关处理:去除 /simulation/data (2 级)
转发到 Data: /ws/data/modelTraining

3.2 Nginx 配置

文件: /usr/local/nginx/conf/nginx.conf 或独立配置文件

server {
  listen       3001;  # Nginx 监听端口
  server_name  localhost;
  
  # WebSocket 代理配置
  location /wsApi/ {
    # 路径重写:去掉 /wsApi/ 前缀
    rewrite ^/wsApi/(.*)$ /$1 break;
    
    # 转发到网关
    proxy_pass http://192.168.65.161:7100;
    
    # ========== WebSocket 必需配置 ==========
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    
    # 超时时间设置(长连接)
    proxy_read_timeout 86400;
    proxy_send_timeout 86400;
  }
}

配置说明:

配置项 作用 必要性
proxy_http_version 1.1 使用 HTTP/1.1 协议 必需
proxy_set_header Upgrade $http_upgrade 传递协议升级头 必需
proxy_set_header Connection "upgrade" 传递连接升级指令 必需
rewrite ^/wsApi/(.*)$ /$1 break 路径重写 必需

3.3 前端配置

3.3.1 连接 URL 格式

const wsUrl = `ws://nginx-ip:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${userId}&modelId=${modelId}`;

URL 组成解析:

ws://                              # WebSocket 协议
192.168.65.57:3001                # Nginx 地址和端口
/wsApi/                           # Nginx WebSocket 代理前缀
/simulation/data/                 # 网关路由前缀
/ws/data/modelTraining            # Data 服务注册的 WebSocket 路径
?userId=xxx&modelId=xxx          # 业务参数

3.3.2 连接示例代码

class ModelWebSocket {
  constructor(userId, modelId) {
    this.userId = userId;
    this.modelId = modelId;
    this.ws = null;
    this.reconnectTimer = null;
    this.heartbeatTimer = null;
    this.reconnectCount = 0;
    this.maxReconnectCount = 5;
    this.reconnectInterval = 3000;
    this.heartbeatInterval = 30000;
  }
  
  // 建立连接
  connect() {
    const wsUrl = `ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=${this.userId}&modelId=${this.modelId}`;
    
    try {
      this.ws = new WebSocket(wsUrl);
      this.ws.onopen = this.onOpen.bind(this);
      this.ws.onmessage = this.onMessage.bind(this);
      this.ws.onclose = this.onClose.bind(this);
      this.ws.onerror = this.onError.bind(this);
    } catch (error) {
      console.error('WebSocket 连接异常:', error);
      this.reconnect();
    }
  }
  
  // 连接成功
  onOpen(event) {
    console.log('WebSocket 连接成功', event);
    this.reconnectCount = 0;
    this.startHeartbeat();
  }
  
  // 接收消息
  onMessage(event) {
    const data = JSON.parse(event.data);
    console.log('收到消息:', data);
    
    // 根据消息类型处理
    switch (data.type) {
      case 'connection':
        console.log('连接确认,会话 ID:', data.sessionId);
        break;
      case 'modelDataProcessing':
        this.handleDataProcessing(data);
        break;
      case 'modelTrainingProcessing':
        this.handleTrainingProcessing(data);
        break;
      case 'modelPredictionResult':
        this.handlePredictionResult(data);
        break;
      default:
        console.warn('未知消息类型:', data.type);
    }
  }
  
  // 连接关闭
  onClose(event) {
    console.log('WebSocket 连接关闭', event);
    this.stopHeartbeat();
    
    // 非正常关闭时重连
    if (event.code !== 1000) {
      this.reconnect();
    }
  }
  
  // 连接错误
  onError(error) {
    console.error('WebSocket 错误:', error);
  }
  
  // 重连机制
  reconnect() {
    if (this.reconnectCount >= this.maxReconnectCount) {
      console.error('重连次数已达上限');
      return;
    }
    
    this.reconnectCount++;
    console.log(`第 ${this.reconnectCount} 次重连...`);
    
    this.reconnectTimer = setTimeout(() => {
      this.connect();
    }, this.reconnectInterval);
  }
  
  // 心跳机制
  startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      if (this.ws && this.ws.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
      }
    }, this.heartbeatInterval);
  }
  
  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }
  
  // 发送消息
  send(message) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      console.warn('WebSocket 未连接,无法发送消息');
    }
  }
  
  // 关闭连接
  close() {
    this.reconnectTimer && clearTimeout(this.reconnectTimer);
    this.stopHeartbeat();
    
    if (this.ws) {
      this.ws.close(1000, '正常关闭');
      this.ws = null;
    }
  }
}

// 使用示例
const wsClient = new ModelWebSocket(userId, modelId);
wsClient.connect();

4. 工作原理

4.1 连接建立流程

sequenceDiagram
    participant 前端
    participant Nginx
    participant Gateway
    participant Data
    participant Interceptor
    participant Service
    
    前端->>Nginx: ws://nginx:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123
    Nginx->>Nginx: rewrite 去掉 /wsApi/
    Nginx->>Gateway: http://gateway:7100/simulation/data/ws/data/modelTraining?userId=1&modelId=123
    Gateway->>Gateway: 匹配路由 /simulation/data/**
    Gateway->>Gateway: StripPrefix=2 去除前缀
    Gateway->>Data: http://data:7104/ws/data/modelTraining?userId=1&modelId=123
    Data->>Interceptor: beforeHandshake()
    Interceptor->>Interceptor: 提取参数到 attributes
    Interceptor->>Data: 返回 true 允许握手
    Data->>Service: afterConnectionEstablished()
    Service->>Service: 存储 session 到 sessions map
    Service->>Service: 从 attributes 读取 userId, modelId
    Service->>Service: 建立 userModelSessionMap 映射
    Service->>前端:发送连接成功消息

4.2 参数传递机制

4.2.1 参数流转

URL 参数 → Interceptor → attributes → WebSocketSession → Service

详细说明:

  1. URL 参数阶段

    /ws/data/modelTraining?userId=1&modelId=123
    
  2. 拦截器提取阶段

    // WebSocketHandshakeInterceptor.java
    String userIdStr = servletRequest.getServletRequest().getParameter("userId");
    attributes.put("userId", Long.valueOf(userIdStr));
    
  3. 会话属性存储阶段

    // WebSocketSession.attributes 中存储
    session.getAttributes().get("userId")
    
  4. 服务使用阶段

    // WebSocketService.java
    Object userIdObj = session.getAttributes().get("userId");
    Long userId = (Long) userIdObj;
    

4.2.2 为什么使用 attributes

  • 符合规范: 遵循 WebSocket 参数传递规范
  • 类型安全: 在拦截器中完成类型转换
  • 解耦设计: Service 层不需要解析 URL 参数
  • 统一管理: 所有会话参数集中管理

4.3 会话映射机制

4.3.1 双 Map 结构

// Map 1: sessionId -> WebSocketSession
ConcurrentHashMap<String, WebSocketSession> sessions

// Map 2: userId_modelId -> sessionId
ConcurrentHashMap<String, String> userModelSessionMap

4.3.2 映射关系建立

// 连接建立时
String key = userId + "_" + modelId;  // 复合键
userModelSessionMap.put(key, session.getId());
sessions.put(session.getId(), session);

4.3.3 会话定位流程

graph LR
    A[需要发送消息] --> B{目标是谁?}
    B -->|特定用户模型 | C[构建复合键 userId_modelId]
    C --> D[查询 userModelSessionMap]
    D --> E[获取 sessionId]
    E --> F[查询 sessions]
    F --> G[获取 WebSocketSession]
    G --> H[发送消息]

4.4 消息推送机制

4.4.1 业务通知封装

// 发送数据处理完成通知
public void sendDataProcessingNotification(Long userId, Long modelId, 
                                          boolean success, String message) {
    JSONObject notification = new JSONObject();
    notification.put("type", "modelDataProcessing");
    notification.put("modelId", modelId);
    notification.put("success", success);
    notification.put("message", message);
    
    sendToUserModel(userId, modelId, notification.toJSONString());
}

4.4.2 消息格式规范

{
  "type": "modelDataProcessing",
  "modelId": 123,
  "success": true,
  "message": "数据处理完成"
}

消息类型枚举:

type 说明 触发时机
connection 连接确认 连接建立成功
modelDataProcessing 数据处理通知 模型数据处理完成
modelTrainingProcessing 训练处理通知 训练任务完成
modelPredictionResult 预测结果通知 预测任务完成

5. 交互流程

5.1 完整生命周期

sequenceDiagram
    autonumber
    participant 前端
    participant Nginx
    participant Gateway
    participant Data
    participant 业务代码
    
    前端->>Nginx: 1. WebSocket 连接请求
    Nginx->>Gateway: 2. 转发(路径重写后)
    Gateway->>Data: 3. 路由转发(去除前缀)
    Data->>Data: 4. 拦截器提取参数
    Data->>Data: 5. 建立 WebSocket 连接
    Data->>前端6. 返回连接成功消息
    
    Note over 前端Data: 连接已建立,等待业务事件
    
    业务代码->>Data: 7. 调用 sendXxxNotification()
    Data->>Data: 8. 查找目标会话
    Data->>前端9. 推送业务通知消息
    
    Note over 前端Data: 业务处理中...
    
    前端->>Data: 10. 发送心跳/业务消息
    Data->>Data: 11. handleTextMessage()
    
    Note over 前端Data: 页面关闭或网络断开
    
    前端->>Data: 12. 关闭连接
    Data->>Data: 13. afterConnectionClosed()
    Data->>Data: 14. 清理会话映射

5.2 典型业务场景

场景 1: 模型数据处理完成

// 业务代码调用
@Autowired
private WebSocketService webSocketService;

public void processDataModel(Long userId, Long modelId) {
    try {
        // 1. 执行业务逻辑
        // ... 数据处理代码 ...
        
        // 2. 处理完成后推送通知
        webSocketService.sendDataProcessingNotification(
            userId, 
            modelId, 
            true, 
            "数据处理成功"
        );
    } catch (Exception e) {
        // 3. 处理失败推送错误通知
        webSocketService.sendDataProcessingNotification(
            userId, 
            modelId, 
            false, 
            "数据处理失败:" + e.getMessage()
        );
    }
}

场景 2: 模型训练任务完成

public void onTrainingComplete(Long userId, Long modelId, TrainingResult result) {
    boolean success = result.isSuccess();
    String message = success ? "训练完成" : "训练失败";
    
    webSocketService.sendTrainingProcessingNotification(
        userId, 
        modelId, 
        success, 
        message
    );
}

场景 3: 预测结果返回

public void onPredictionComplete(Long userId, Long modelId, PredictionResult result) {
    JSONObject notification = new JSONObject();
    notification.put("type", "modelPredictionResult");
    notification.put("modelId", modelId);
    notification.put("success", result.isSuccess());
    notification.put("data", result.getData());
    notification.put("message", result.getMessage());
    
    webSocketService.sendToUserModel(userId, modelId, notification.toJSONString());
}

6. 使用示例

6.1 后端调用示例

示例 1: 在 Service 中推送通知

@Service
public class ModelTrainingService {
    
    @Autowired
    private WebSocketService webSocketService;
    
    @Async
    public void trainModel(Long userId, Long modelId, TrainingParams params) {
        try {
            log.info("开始训练模型userId={}, modelId={}", userId, modelId);
            
            // 执行训练逻辑
            TrainingResult result = executeTraining(params);
            
            // 推送成功通知
            webSocketService.sendTrainingProcessingNotification(
                userId, 
                modelId, 
                true, 
                "模型训练完成,准确率:" + result.getAccuracy()
            );
            
        } catch (Exception e) {
            log.error("模型训练失败", e);
            
            // 推送失败通知
            webSocketService.sendTrainingProcessingNotification(
                userId, 
                modelId, 
                false, 
                "训练失败:" + e.getMessage()
            );
        }
    }
}

示例 2: 在 Controller 中触发

@RestController
@RequestMapping("/data/model")
public class ModelController {
    
    @Autowired
    private WebSocketService webSocketService;
    
    @PostMapping("/predict")
    public R<PredictionResult> predict(@RequestBody PredictionRequest request) {
        Long userId = request.getUserId();
        Long modelId = request.getModelId();
        
        try {
            // 执行预测
            PredictionResult result = predictionService.predict(request);
            
            // 推送预测结果
            webSocketService.sendPredictionResultNotification(
                userId, 
                modelId, 
                true, 
                "预测完成"
            );
            
            return R.ok(result);
        } catch (Exception e) {
            // 推送错误通知
            webSocketService.sendPredictionResultNotification(
                userId, 
                modelId, 
                false, 
                "预测失败:" + e.getMessage()
            );
            
            return R.fail(e.getMessage());
        }
    }
}

6.2 前端使用示例

Vue 3 示例

<template>
  <div>
    <div v-if="isConnected"> 已连接</div>
    <div v-else> 未连接</div>
    
    <div v-if="notifications.length">
      <h3>通知消息</h3>
      <ul>
        <li v-for="msg in notifications" :key="msg.id">
          {{ msg.message }}
        </li>
      </ul>
    </div>
  </div>
</template>

<script setup>
import { ref, onMounted, onUnmounted } from 'vue';

const isConnected = ref(false);
const notifications = ref([]);
let wsClient = null;

onMounted(() => {
  const userId = 1979078323595476993;
  const modelId = 66;
  
  wsClient = new ModelWebSocket(userId, modelId, {
    onConnect: () => {
      isConnected.value = true;
    },
    onDisconnect: () => {
      isConnected.value = false;
    },
    onMessage: (data) => {
      notifications.value.unshift({
        id: Date.now(),
        ...data
      });
    }
  });
  
  wsClient.connect();
});

onUnmounted(() => {
  wsClient?.close();
});
</script>

React 示例

import { useEffect, useState } from 'react';

function ModelNotification({ userId, modelId }) {
  const [isConnected, setIsConnected] = useState(false);
  const [notifications, setNotifications] = useState([]);
  
  useEffect(() => {
    const wsClient = new ModelWebSocket(userId, modelId, {
      onConnect: () => setIsConnected(true),
      onDisconnect: () => setIsConnected(false),
      onMessage: (data) => {
        setNotifications(prev => [{
          id: Date.now(),
          ...data
        }, ...prev]);
      }
    });
    
    wsClient.connect();
    
    return () => {
      wsClient.close();
    };
  }, [userId, modelId]);
  
  return (
    <div>
      {isConnected ? <span> 已连接</span> : <span> 未连接</span>}
      <ul>
        {notifications.map(msg => (
          <li key={msg.id}>{msg.message}</li>
        ))}
      </ul>
    </div>
  );
}

7. 常见问题排查

7.1 连接问题

问题 1: 断点不触发

现象: 在 WebSocketHandshakeInterceptor 打断点,但不断入

排查步骤:

  1. 确认 Nginx 监听端口与前端连接端口一致3001
  2. 确认 Nginx 配置已重载:nginx -s reload
  3. 确认网关在 192.168.65.161:7100 运行
  4. 确认 Data 服务已注册到 Nacos
  5. 检查前端连接 URL 是否正确

验证命令:

# 检查 Nginx 端口
netstat -an | grep 3001

# 检查网关端口
netstat -an | grep 7100

# 检查 Data 服务端口
netstat -an | grep 7104

# 测试 WebSocket 连接
wscat -c "ws://192.168.65.57:3001/wsApi/simulation/data/ws/data/modelTraining?userId=1&modelId=123"

问题 2: 连接后立即断开

可能原因:

  • Nginx 未配置 WebSocket 头部
  • 超时时间设置过短
  • 防火墙拦截

解决方案:

location /wsApi/ {
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_read_timeout 86400;
    proxy_send_timeout 86400;
}

7.2 消息推送问题

问题 3: 消息发送失败

错误日志:

WARN 未找到用户 123 的模型 456 的 WebSocket 会话

原因:

  • 用户未连接或已断开连接
  • userId 或 modelId 不匹配

解决方案:

  1. 前端检查连接 URL 中的参数是否正确
  2. 后端检查发送消息时使用的 userId/modelId 是否正确
  3. WebSocketService 中添加日志查看映射关系

问题 4: 消息发送异常

错误日志:

ERROR 发送 WebSocket 消息失败,会话 ID: xxx

原因:

  • 会话已关闭但仍尝试发送

解决方案: 代码已处理,会自动检查会话状态:

if (session.isOpen()) {
    session.sendMessage(new TextMessage(message));
}

7.3 跨域问题

问题 5: 跨域错误

浏览器错误:

WebSocket connection to 'ws://...' failed: 
Error during WebSocket handshake: 
Unexpected response code: 200

解决方案:

// WebSocketConfig.java
registry.addHandler(webSocketService, "/ws/data/modelTraining")
        .setAllowedOrigins("*");

7.4 并发问题

问题 6: 并发修改异常

错误: ConcurrentModificationException

原因: 多线程同时修改 Map

解决方案: 已使用 ConcurrentHashMap,线程安全:

private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

7.5 内存泄漏问题

问题 7: 会话未清理

现象: 运行一段时间后 OOM

排查:

  1. 检查 afterConnectionClosed 是否清理映射
  2. 检查是否有重复注册

当前实现:

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
    String sessionId = session.getId();
    sessions.remove(sessionId);
    userModelSessionMap.values().removeIf(id -> id.equals(sessionId));
}

附录

A. 相关文件清单

data/
├── src/main/java/com/sdm/data/
│   ├── config/
│   │   ├── WebSocketConfig.java              # WebSocket 配置类
│   │   └── WebSocketHandshakeInterceptor.java # 握手拦截器
│   └── service/
│       └── WebSocketService.java             # WebSocket 服务
└── src/main/resources/
    └── application-local.yml                 # Data 服务配置

B. 依赖配置

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
</dependency>

C. 测试工具

wscat 安装

npm install -g wscat

测试命令

# 测试连接
wscat -c "ws://localhost:7104/ws/data/modelTraining?userId=1&modelId=123"

# 发送消息
> {"type": "ping"}

D. 参考资料


更新日志

日期 版本 说明 作者
2026-03-06 v1.0 初始版本 Gulongcheng