diff --git a/common/src/main/java/com/sdm/common/common/WsSceneEnum.java b/common/src/main/java/com/sdm/common/common/WsSceneEnum.java index 66933b08..2abd885f 100644 --- a/common/src/main/java/com/sdm/common/common/WsSceneEnum.java +++ b/common/src/main/java/com/sdm/common/common/WsSceneEnum.java @@ -8,7 +8,8 @@ import lombok.Getter; public enum WsSceneEnum { WS_HEART_BEAT("WS_HEART_BEAT","websocket心跳检测" ), - BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" ); + BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" ), + FLOW_NODE_STATUS_CHANGED("FLOW_NODE_STATUS_CHANGED","流程节点状态变化通知" ); /** * scene webscoket 推送的业务场景 diff --git a/flowable/src/main/java/com/sdm/flowable/listener/GlobalStatusEventListener.java b/flowable/src/main/java/com/sdm/flowable/listener/GlobalStatusEventListener.java index 6b3ffb20..7ae04706 100644 --- a/flowable/src/main/java/com/sdm/flowable/listener/GlobalStatusEventListener.java +++ b/flowable/src/main/java/com/sdm/flowable/listener/GlobalStatusEventListener.java @@ -49,11 +49,13 @@ public class GlobalStatusEventListener implements FlowableEventListener { @Override public Set getTypes() { return new HashSet<>(Arrays.asList( - FlowableEngineEventType.PROCESS_COMPLETED, // 流程完成 - FlowableEngineEventType.PROCESS_CANCELLED, // 流程取消(精确匹配,不需要判断deleteReason) - FlowableEngineEventType.ENTITY_SUSPENDED, // 实体挂起(需过滤流程实例) - FlowableEngineEventType.ENTITY_ACTIVATED, // 实体激活(需过滤流程实例) - FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER // 作业进入死信(ERROR状态) + FlowableEngineEventType.PROCESS_COMPLETED, // 流程完成 + FlowableEngineEventType.PROCESS_CANCELLED, // 流程取消(精确匹配,不需要判断deleteReason) + FlowableEngineEventType.ENTITY_SUSPENDED, // 实体挂起(需过滤流程实例) + FlowableEngineEventType.ENTITY_ACTIVATED, // 实体激活(需过滤流程实例) + FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER, // 作业进入死信(ERROR状态) + FlowableEngineEventType.ACTIVITY_STARTED, // 节点开始 + FlowableEngineEventType.ACTIVITY_COMPLETED // 节点完成 )); } @@ -79,6 +81,11 @@ public class GlobalStatusEventListener implements FlowableEventListener { else if (eventType == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) { handleDeadLetter((FlowableEngineEntityEvent) event); } + // 5. 节点状态变化(开始/完成)触发前端刷新节点状态 + else if (eventType == FlowableEngineEventType.ACTIVITY_STARTED || + eventType == FlowableEngineEventType.ACTIVITY_COMPLETED) { + handleNodeActivityChanged((FlowableEngineEntityEvent) event, eventType); + } } catch (Exception e) { log.error("处理Flowable事件异常: eventType={}", event.getType(), e); } @@ -172,6 +179,38 @@ public class GlobalStatusEventListener implements FlowableEventListener { } } + /** + * 处理节点活动状态变化事件(节点开始/节点完成) + * 复用算例状态更新接口以触发 project 服务 websocket 通知逻辑。 + */ + private void handleNodeActivityChanged(FlowableEngineEntityEvent event, FlowableEngineEventType eventType) { + Object entity = event.getEntity(); + if (!(entity instanceof ExecutionEntity)) { + return; + } + + ExecutionEntity execution = (ExecutionEntity) entity; + // 只关注具体节点,避免流程实例根节点重复触发 + if (execution.isProcessInstanceType()) { + return; + } + + String processInstanceId = execution.getProcessInstanceId(); + if (processInstanceId == null) { + return; + } + + Long userId = (Long) execution.getVariable("userId"); + Long tenantId = (Long) execution.getVariable("tenantId"); + + // 节点状态变化时,算例状态保持运行中,主要目的是触发前端刷新节点状态 + Integer statusCode = ProcessInstanceStateEnum.RUNNING.getCode(); + log.info("流程节点状态变化: eventType={}, processInstanceId={}, activityId={}, userId={}, tenantId={}", + eventType, processInstanceId, execution.getCurrentActivityId(), userId, tenantId); + + doUpdate(processInstanceId, statusCode, userId, tenantId); + } + /** * 真正的更新数据库逻辑 * 通过Feign调用project服务更新算例状态 diff --git a/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java b/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java index af5f6d83..2dc82839 100644 --- a/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java +++ b/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java @@ -14,6 +14,8 @@ import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.common.WsMessage; +import com.sdm.common.common.WsSceneEnum; import com.sdm.common.config.FlowableConfig; import com.sdm.common.entity.constants.CommonConstants; import com.sdm.common.entity.constants.NumberConstants; @@ -47,6 +49,7 @@ import com.sdm.common.feign.impl.data.DataClientFeignClientImpl; import com.sdm.common.feign.impl.flowable.FlowableClientFeignClientImpl; import com.sdm.common.feign.impl.system.ApproveFeignClientImpl; import com.sdm.common.feign.impl.system.SysUserFeignClientImpl; +import com.sdm.common.feign.inter.system.IWsPushToolFeignClient; import com.sdm.common.service.DataFileService; import com.sdm.common.service.FileBizTypeService; import com.sdm.common.utils.CommonUtils; @@ -168,6 +171,9 @@ public class SimulationRunServiceImpl extends ServiceImpl payload = new HashMap<>(); + payload.put("runId", run.getUuid()); + payload.put("processInstanceId", run.getFlowInstanceId()); + payload.put("statusCode", statusCode); + payload.put("event", "FLOW_NODE_STATUS_CHANGED"); + payload.put("refreshApi", "/run/listFlowNodes"); + + WsMessage> message = new WsMessage<>(); + message.setScene(WsSceneEnum.FLOW_NODE_STATUS_CHANGED.getScene()); + message.setUserId(run.getCreator()); + message.setTimestamp(System.currentTimeMillis()); + message.setData(payload); + + SdmResponse wsResponse = wsPushToolFeignClient.wsPushAll(message); + log.info("流程节点状态变化通知发送完成: runId={}, userId={}, wsResp={}", + run.getUuid(), run.getCreator(), JSON.toJSONString(wsResponse)); + } catch (Exception e) { + // websocket通知失败不影响主流程 + log.error("发送流程节点状态变化通知异常: runId={}, statusCode={}", run.getUuid(), statusCode, e); + } + } + @Override public SdmResponse> batchAddFileInfoForTask(UploadFilesReq req) { // 3D模型、计算模型、图片文件、曲线文件、仿真报告 获取文件夹名