fix:流程基于webscoket通知前端更新流程和节点状态

This commit is contained in:
2026-04-13 13:44:50 +08:00
parent 80bf6b322a
commit 27e40b08bd
3 changed files with 86 additions and 6 deletions

View File

@@ -8,7 +8,8 @@ import lombok.Getter;
public enum WsSceneEnum {
WS_HEART_BEAT("WS_HEART_BEAT","websocket心跳检测" ),
BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" );
BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" ),
FLOW_NODE_STATUS_CHANGED("FLOW_NODE_STATUS_CHANGED","流程节点状态变化通知" );
/**
* scene webscoket 推送的业务场景

View File

@@ -49,11 +49,13 @@ public class GlobalStatusEventListener implements FlowableEventListener {
@Override
public Set<FlowableEngineEventType> getTypes() {
return new HashSet<>(Arrays.asList(
FlowableEngineEventType.PROCESS_COMPLETED, // 流程完成
FlowableEngineEventType.PROCESS_CANCELLED, // 流程取消精确匹配不需要判断deleteReason
FlowableEngineEventType.ENTITY_SUSPENDED, // 实体挂起(需过滤流程实例)
FlowableEngineEventType.ENTITY_ACTIVATED, // 实体激活(需过滤流程实例)
FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER // 作业进入死信ERROR状态
FlowableEngineEventType.PROCESS_COMPLETED, // 流程完成
FlowableEngineEventType.PROCESS_CANCELLED, // 流程取消精确匹配不需要判断deleteReason
FlowableEngineEventType.ENTITY_SUSPENDED, // 实体挂起(需过滤流程实例)
FlowableEngineEventType.ENTITY_ACTIVATED, // 实体激活(需过滤流程实例)
FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER, // 作业进入死信ERROR状态
FlowableEngineEventType.ACTIVITY_STARTED, // 节点开始
FlowableEngineEventType.ACTIVITY_COMPLETED // 节点完成
));
}
@@ -79,6 +81,11 @@ public class GlobalStatusEventListener implements FlowableEventListener {
else if (eventType == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) {
handleDeadLetter((FlowableEngineEntityEvent) event);
}
// 5. 节点状态变化(开始/完成)触发前端刷新节点状态
else if (eventType == FlowableEngineEventType.ACTIVITY_STARTED ||
eventType == FlowableEngineEventType.ACTIVITY_COMPLETED) {
handleNodeActivityChanged((FlowableEngineEntityEvent) event, eventType);
}
} catch (Exception e) {
log.error("处理Flowable事件异常: eventType={}", event.getType(), e);
}
@@ -172,6 +179,38 @@ public class GlobalStatusEventListener implements FlowableEventListener {
}
}
/**
* 处理节点活动状态变化事件(节点开始/节点完成)
* 复用算例状态更新接口以触发 project 服务 websocket 通知逻辑。
*/
private void handleNodeActivityChanged(FlowableEngineEntityEvent event, FlowableEngineEventType eventType) {
Object entity = event.getEntity();
if (!(entity instanceof ExecutionEntity)) {
return;
}
ExecutionEntity execution = (ExecutionEntity) entity;
// 只关注具体节点,避免流程实例根节点重复触发
if (execution.isProcessInstanceType()) {
return;
}
String processInstanceId = execution.getProcessInstanceId();
if (processInstanceId == null) {
return;
}
Long userId = (Long) execution.getVariable("userId");
Long tenantId = (Long) execution.getVariable("tenantId");
// 节点状态变化时,算例状态保持运行中,主要目的是触发前端刷新节点状态
Integer statusCode = ProcessInstanceStateEnum.RUNNING.getCode();
log.info("流程节点状态变化: eventType={}, processInstanceId={}, activityId={}, userId={}, tenantId={}",
eventType, processInstanceId, execution.getCurrentActivityId(), userId, tenantId);
doUpdate(processInstanceId, statusCode, userId, tenantId);
}
/**
* 真正的更新数据库逻辑
* 通过Feign调用project服务更新算例状态

View File

@@ -14,6 +14,8 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.ThreadLocalContext;
import com.sdm.common.common.WsMessage;
import com.sdm.common.common.WsSceneEnum;
import com.sdm.common.config.FlowableConfig;
import com.sdm.common.entity.constants.CommonConstants;
import com.sdm.common.entity.constants.NumberConstants;
@@ -47,6 +49,7 @@ import com.sdm.common.feign.impl.data.DataClientFeignClientImpl;
import com.sdm.common.feign.impl.flowable.FlowableClientFeignClientImpl;
import com.sdm.common.feign.impl.system.ApproveFeignClientImpl;
import com.sdm.common.feign.impl.system.SysUserFeignClientImpl;
import com.sdm.common.feign.inter.system.IWsPushToolFeignClient;
import com.sdm.common.service.DataFileService;
import com.sdm.common.service.FileBizTypeService;
import com.sdm.common.utils.CommonUtils;
@@ -168,6 +171,9 @@ public class SimulationRunServiceImpl extends ServiceImpl<SimulationRunMapper, S
@Autowired
private SimulationNodeMapper nodeMapper;
@Autowired
private IWsPushToolFeignClient wsPushToolFeignClient;
@Resource
private SimulationProjectMapper projectMapper;
@@ -3066,6 +3072,8 @@ public class SimulationRunServiceImpl extends ServiceImpl<SimulationRunMapper, S
if (updated) {
log.info("算例状态更新成功: runId={}, flowInstanceId={}, statusCode={}",
run.getUuid(), processInstanceId, statusCode);
// 流程状态变化后,通过 websocket 通知前端触发 listFlowNodes 刷新
pushFlowNodeStatusChanged(run, statusCode);
return SdmResponse.success("状态更新成功");
} else {
log.error("算例状态更新失败: runId={}, flowInstanceId={}", run.getUuid(), processInstanceId);
@@ -3077,6 +3085,38 @@ public class SimulationRunServiceImpl extends ServiceImpl<SimulationRunMapper, S
}
}
/**
* 推送流程节点状态变化事件,通知前端调用 /run/listFlowNodes 刷新节点状态。
*/
private void pushFlowNodeStatusChanged(SimulationRun run, Integer statusCode) {
if (run == null || run.getCreator() == null) {
log.warn("推送流程节点状态变化通知跳过run或creator为空, run={}", run);
return;
}
try {
Map<String, Object> payload = new HashMap<>();
payload.put("runId", run.getUuid());
payload.put("processInstanceId", run.getFlowInstanceId());
payload.put("statusCode", statusCode);
payload.put("event", "FLOW_NODE_STATUS_CHANGED");
payload.put("refreshApi", "/run/listFlowNodes");
WsMessage<Map<String, Object>> message = new WsMessage<>();
message.setScene(WsSceneEnum.FLOW_NODE_STATUS_CHANGED.getScene());
message.setUserId(run.getCreator());
message.setTimestamp(System.currentTimeMillis());
message.setData(payload);
SdmResponse wsResponse = wsPushToolFeignClient.wsPushAll(message);
log.info("流程节点状态变化通知发送完成: runId={}, userId={}, wsResp={}",
run.getUuid(), run.getCreator(), JSON.toJSONString(wsResponse));
} catch (Exception e) {
// websocket通知失败不影响主流程
log.error("发送流程节点状态变化通知异常: runId={}, statusCode={}", run.getUuid(), statusCode, e);
}
}
@Override
public SdmResponse<List<BatchAddFileInfoResp>> batchAddFileInfoForTask(UploadFilesReq req) {
// 3D模型、计算模型、图片文件、曲线文件、仿真报告 获取文件夹名