fix:SSE添加日志&获取用户token改成username接收
This commit is contained in:
@@ -78,6 +78,7 @@ public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message> impl
|
|||||||
|
|
||||||
|
|
||||||
R<Object> sendSSEMessage(Message message) {
|
R<Object> sendSSEMessage(Message message) {
|
||||||
|
log.info("发送消息通知!");
|
||||||
Long userId = message.getUserId();
|
Long userId = message.getUserId();
|
||||||
// 这里去获取真实的消息参数信息
|
// 这里去获取真实的消息参数信息
|
||||||
String realMessage = messageTaskNotifyService.getMessageJson(message);
|
String realMessage = messageTaskNotifyService.getMessageJson(message);
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
@@ -72,15 +74,42 @@ public class SSEServiceImpl implements SSEService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMessage(Long userId, String eventName, String message) {
|
public void sendMessage(Long userId, String eventName, String message) {
|
||||||
// 这里根据用户ID获取所有的SSE链接信息
|
log.info("[SSEServiceImpl] sendMessage: userId={}, event={}, message={}", userId, eventName, message);
|
||||||
Map<String, SseEmitter> emitters = userEmitters.entrySet().stream().filter(res -> res.getKey().startsWith(userId + ":")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
||||||
for (String key : emitters.keySet()) {
|
// 安全获取该用户所有 SSE 连接
|
||||||
|
List<Map.Entry<String, SseEmitter>> userEmittersList = userEmitters.entrySet().stream()
|
||||||
|
.filter(res -> res.getKey().startsWith(userId + ":"))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
// 用迭代器遍历,避免并发修改异常
|
||||||
|
Iterator<Map.Entry<String, SseEmitter>> iterator = userEmittersList.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<String, SseEmitter> entry = iterator.next();
|
||||||
|
String key = entry.getKey();
|
||||||
|
SseEmitter emitter = entry.getValue();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
emitters.get(key).send(SseEmitter.event().name(eventName).data(message));
|
// 关键:先判断 emitter 是否可用
|
||||||
} catch (Exception e) {
|
if (emitter == null) {
|
||||||
log.info("###Error sending " + eventName + " message to user: " + userId);
|
userEmitters.remove(key);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送 SSE 事件
|
||||||
|
emitter.send(SseEmitter.event().name(eventName).data(message));
|
||||||
|
log.info("[SSE] 发送成功: key={}", key);
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
// 客户端断开连接(最常见)
|
||||||
|
log.warn("[SSE] 客户端断开,自动移除: key={}, error={}", key, e.getMessage());
|
||||||
|
userEmitters.remove(key);
|
||||||
|
} catch (IllegalStateException e) {
|
||||||
|
// emitter 已完成/超时
|
||||||
|
log.warn("[SSE] Emitter 已关闭,自动移除: key={}", key);
|
||||||
|
userEmitters.remove(key);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[SSE] 发送消息异常: key=" + key, e); // 必须打印堆栈
|
||||||
userEmitters.remove(key);
|
userEmitters.remove(key);
|
||||||
log.info("###Removed userId=" + userId + " from userEmitters");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -290,7 +290,7 @@ public class SpdmUserController {
|
|||||||
Long tenantId = 1979091834410176514L;
|
Long tenantId = 1979091834410176514L;
|
||||||
|
|
||||||
// 利元亨传工号
|
// 利元亨传工号
|
||||||
R<List<SysUserVO>> sysUserVOR = remoteUserServiceFeign.getUserListByUserName(String.valueOf(userParamDto.getUserId()));
|
R<List<SysUserVO>> sysUserVOR = remoteUserServiceFeign.getUserListByUserName(userParamDto.getUserName());
|
||||||
if (CollectionUtils.isNotEmpty(sysUserVOR.getData())) {
|
if (CollectionUtils.isNotEmpty(sysUserVOR.getData())) {
|
||||||
userParamDto.setUserId(sysUserVOR.getData().get(0).getUserId());
|
userParamDto.setUserId(sysUserVOR.getData().get(0).getUserId());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user