From 42c8c81def94a0d2d49df329c3d15eadf3edd3d4 Mon Sep 17 00:00:00 2001 From: gulongcheng <474084054@qq.com> Date: Thu, 26 Mar 2026 16:06:22 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E6=94=AF=E6=8C=81=E7=9B=B4?= =?UTF-8?q?=E6=8E=A5=E7=A1=AE=E8=AE=A4=E8=8A=82=E7=82=B9=E4=B8=BA=E5=B7=B2?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flowable/repomix-output.xml | 1459 ++++++++++++++--- .../sdm/flowable/enums/OperationTypeEnum.java | 4 +- .../sdm/flowable/process/ProcessService.java | 197 ++- .../sdm/flowable/util/Dto2BpmnConverter.java | 26 + 4 files changed, 1436 insertions(+), 250 deletions(-) diff --git a/flowable/repomix-output.xml b/flowable/repomix-output.xml index 0886f0b9..307c24a1 100644 --- a/flowable/repomix-output.xml +++ b/flowable/repomix-output.xml @@ -44,6 +44,7 @@ The content is organized as follows: pom.xml src/main/java/com/sdm/flowable/aop/StateGuard.java src/main/java/com/sdm/flowable/aop/StateGuardAspect.java +src/main/java/com/sdm/flowable/config/FlowableEngineConfig.java src/main/java/com/sdm/flowable/config/RequestConfig.java src/main/java/com/sdm/flowable/controller/ProcessController.java src/main/java/com/sdm/flowable/dao/AsyncTaskRecordMapper.java @@ -56,6 +57,7 @@ src/main/java/com/sdm/flowable/delegate/handler/ExportWordScriptHandler.java src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java src/main/java/com/sdm/flowable/delegate/handler/HttpHandler.java src/main/java/com/sdm/flowable/delegate/handler/LocalAppHandler.java +src/main/java/com/sdm/flowable/delegate/LocalAppRegisterDelegate.java src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java src/main/java/com/sdm/flowable/dto/ProcessDefinitionInfo.java src/main/java/com/sdm/flowable/dto/req/CompleteTaskReq.java @@ -65,12 +67,14 @@ src/main/java/com/sdm/flowable/dto/resp/NodeInputFilePreviewResp.java src/main/java/com/sdm/flowable/entity/AsyncTaskRecord.java src/main/java/com/sdm/flowable/entity/ProcessNodeParam.java src/main/java/com/sdm/flowable/enums/AsyncTaskStatusEnum.java +src/main/java/com/sdm/flowable/enums/ExecuteTypeEnum.java src/main/java/com/sdm/flowable/enums/FlowElementTypeEnums.java src/main/java/com/sdm/flowable/enums/NodeStateEnum.java src/main/java/com/sdm/flowable/enums/OperationTypeEnum.java src/main/java/com/sdm/flowable/enums/ProcessInstanceStateEnum.java src/main/java/com/sdm/flowable/filter/AuthFilter.java src/main/java/com/sdm/flowable/FlowableApplication.java +src/main/java/com/sdm/flowable/listener/GlobalStatusEventListener.java src/main/java/com/sdm/flowable/listener/RetryRedirectListener.java src/main/java/com/sdm/flowable/listener/UserTaskDirectoryPreparationListener.java src/main/java/com/sdm/flowable/process/Iprocess.java @@ -83,11 +87,12 @@ src/main/java/com/sdm/flowable/service/impl/ProcessStateHelper.java src/main/java/com/sdm/flowable/service/IProcessNodeParamService.java src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java +src/main/resources/application-dev-100.yml src/main/resources/application-dev-190.yml src/main/resources/application-dev-65.yml src/main/resources/application-local.yml src/main/resources/application-lyric.yml -src/main/resources/application-prod.yml +src/main/resources/application-yang.yml src/main/resources/application.yml @@ -310,6 +315,57 @@ public class StateGuardAspect { } + +package com.sdm.flowable.config; + +import com.sdm.flowable.listener.GlobalStatusEventListener; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEventListener; +import org.flowable.spring.SpringProcessEngineConfiguration; +import org.flowable.spring.boot.EngineConfigurationConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.ArrayList; +import java.util.List; + +/** + * Flowable流程引擎配置类 + * 用于注册全局事件监听器等配置 + * + * @author SDM + * @date 2026-01-23 + */ +@Slf4j +@Configuration +public class FlowableEngineConfig { + + /** + * 注册全局事件监听器 + * 通过EngineConfigurationConfigurer可以在流程引擎初始化时添加监听器 + */ + @Bean + public EngineConfigurationConfigurer globalEventListenerConfigurer( + GlobalStatusEventListener globalStatusEventListener) { + return engineConfiguration -> { + // 获取现有的事件监听器列表 + List eventListeners = engineConfiguration.getEventListeners(); + if (eventListeners == null) { + eventListeners = new ArrayList<>(); + } + + // 添加全局状态同步监听器 + eventListeners.add(globalStatusEventListener); + engineConfiguration.setEventListeners(eventListeners); + + log.info("✅ 已注册Flowable全局事件监听器: GlobalStatusEventListener"); + log.info(" 监听事件类型: PROCESS_COMPLETED, PROCESS_CANCELLED, ENTITY_SUSPENDED, ENTITY_ACTIVATED, JOB_MOVED_TO_DEADLETTER"); + log.info(" 状态映射: COMPLETED, CANCELLED, SUSPENDED, RUNNING, ERROR"); + }; + } +} + + package com.sdm.flowable.config; @@ -342,19 +398,24 @@ package com.sdm.flowable.controller; import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.req.flowable.AsyncCallbackRequest; +import com.sdm.common.entity.req.flowable.RetryFailedNodeReq; +import com.sdm.common.entity.req.project.SimulationLocalJobReq; import com.sdm.common.entity.resp.flowable.DeployFlowableResp; import com.sdm.common.entity.resp.flowable.ProcessInstanceDetailResponse; import com.sdm.common.entity.resp.flowable.ProcessInstanceResp; import com.sdm.common.feign.inter.flowable.IFlowableFeignClient; +import com.sdm.common.feign.inter.project.ISimulationLocalJobFeignClient; import com.sdm.flowable.aop.StateGuard; import com.sdm.flowable.delegate.handler.HpcHandler; import com.sdm.flowable.dto.req.CompleteTaskReq; import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq; import com.sdm.flowable.dto.req.RetryRequest; import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp; +import com.sdm.flowable.enums.AsyncTaskStatusEnum; import com.sdm.flowable.enums.OperationTypeEnum; import com.sdm.flowable.process.ProcessService; import com.sdm.flowable.service.IProcessNodeParamService; @@ -381,6 +442,9 @@ public class ProcessController implements IFlowableFeignClient { @Autowired private IProcessNodeParamService processNodeParamService; + @Autowired + private ISimulationLocalJobFeignClient simulationLocalJobService; + @Autowired private HpcHandler hpcHandler; @@ -535,6 +599,10 @@ public class ProcessController implements IFlowableFeignClient { /** * 流程节点继续执行(完成人工节点/或者等待用户输入后继续手动执行的节点) * + * 完成 UserTask 并继续 + * ServiceTask 前置人工节点 serviceTask _waitUser 在 ServiceTask 前插入的等待用户确认的 UserTask(Manual 模式) + * 普通人工任务 userTask 原始 UserTask BPMN 中原本的 UserTask + * * @param req * @return */ @@ -546,11 +614,34 @@ public class ProcessController implements IFlowableFeignClient { /** * 异步任务回调接口,用于唤醒等待的流程实例 + * 唤醒 ReceiveTask 并继续 + * 触发的是 ReceiveTask(_wait 节点),通过 runtimeService.trigger() 继续流转 + * + * 适用场景 节点类型 说明 + * HPC 异步计算 ReceiveTask _wait HPC 提交任务后,流程停在 _wait,等待 HPC 回调 + * 本地应用 ReceiveTask _wait 本地应用提交后,流程停在 _wait,等待本地应用回调 + * 其他异步任务 ReceiveTask _wait 流程停在 _wait,由外部系统通过 asyncTaskId 回调 * * @param request 包含异步任务ID和执行结果的请求对象 */ @PostMapping("/asyncCallback") public SdmResponse asyncCallback(@RequestBody AsyncCallbackRequest request) { + try { + // 保存本地应用任务执行结果 + SimulationLocalJobReq simulationLocalJobReq = request.getSimulationLocalJobReq(); + if(Objects.nonNull(simulationLocalJobReq)){ + ThreadLocalContext.setUserId(simulationLocalJobReq.getCreatorId()); + ThreadLocalContext.setTenantId(simulationLocalJobReq.getTenantId()); + simulationLocalJobReq.setJobStatus(request.getStatus()); + if(AsyncTaskStatusEnum.FAIL.getCode().equalsIgnoreCase(request.getStatus())){ + simulationLocalJobReq.setErrMsg(request.getResultJson()); + } + simulationLocalJobService.add(simulationLocalJobReq); + } + } catch (Exception e) { + log.error("异步回调失败: {}", request, e); + return SdmResponse.failed("异步回调失败"); + } log.info("开始处理异步回调请求: {}", request); // 发送信号唤醒流程实例中等待的节点 processService.asyncCallback(request); @@ -561,11 +652,13 @@ public class ProcessController implements IFlowableFeignClient { * 重试任务,目前只能重试当前失败的节点 */ @PostMapping("/retryFailedNode") - @StateGuard(type = OperationTypeEnum.RETRY, idParam = "#processInstanceId") - public SdmResponse retryFailedNode(@RequestParam String processInstanceId, @RequestParam String failNodeId) { - log.info("开始重试任务: {}",failNodeId); + @StateGuard(type = OperationTypeEnum.RETRY, idParam = "#req.processInstanceId") + public SdmResponse retryFailedNode(@RequestBody RetryFailedNodeReq req) { + log.info("开始重试任务: {}",req.getFailNodeId()); try { - processService.retryFailedNode(processInstanceId, failNodeId); + // processService.retryFailedNode(req.getProcessInstanceId(), req.getFailNodeId()); + // 不再调用重试当前节点方法:retryFailedNode,改用重试到指定节点方法:retryToNode + processService.retryToNode(req.getProcessInstanceId(), req.getFailNodeId(), null); return SdmResponse.success("重试任务已提交"); } catch (Exception e) { return SdmResponse.failed("重试失败"); @@ -593,13 +686,15 @@ public class ProcessController implements IFlowableFeignClient { @PostMapping("/testHpc") public String testHpc(@RequestBody Map params) { String beforeNodeId = params.get("beforeNodeId").toString(); + String appuuid = params.get("appuuid").toString(); HPCExecuteConfig config=new HPCExecuteConfig(); + config.setUuid(appuuid); config.setBeforeNodeId(beforeNodeId); if(!Objects.isNull(params.get("masterFileRegularStr"))){ - config.setMasterFileRegularStr(params.get("masterFileRegularStr").toString()); + config.setInputFormat(params.get("masterFileRegularStr").toString()); } if(!Objects.isNull(params.get("inputFilesRegularStr"))){ - config.setInputFilesRegularStr(params.get("inputFilesRegularStr").toString()); + config.setSlaveFormat(params.get("inputFilesRegularStr").toString()); } String currentNodeId = params.get("currentNodeId").toString(); DelegateExecution execution = new DelegateExecution() { @@ -1067,16 +1162,20 @@ public interface ProcessNodeParamMapper extends BaseMapper { package com.sdm.flowable.delegate; +import com.sdm.flowable.enums.AsyncTaskStatusEnum; +import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.springframework.stereotype.Component; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_MSG; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_STATUS; /** * 哨兵节点 ServiceTask (_check) 绑定的异步结果校验逻辑 + * 用于校验外部系统回调回来的结果,如果失败则抛出异常让流程卡死 */ +@Slf4j @Component("asyncResultCheckDelegate") public class AsyncResultCheckDelegate implements JavaDelegate { @@ -1086,6 +1185,7 @@ public class AsyncResultCheckDelegate implements JavaDelegate { // Flowable 的 getVariable 会自动查找 Local -> Parent,所以 setVariableLocal 后这里能取到 String status = (String) execution.getVariable(RECEIVETASK_CALLBACKE_STATUS); String msg = (String) execution.getVariable(RECEIVETASK_CALLBACKE_MSG); + log.info("异步任务回调结果校验:状态={},信息={}", status, msg); // 防御性检查:如果变量丢失(极低概率,除非手动操作了数据库),默认认为是 SUCCESS 防止卡死,或者抛错 if (status == null) { @@ -1095,11 +1195,11 @@ public class AsyncResultCheckDelegate implements JavaDelegate { } // 2. 核心校验逻辑 - if ("FAIL".equals(status)) { + if (AsyncTaskStatusEnum.FAIL.getCode().equalsIgnoreCase(status)) { // 【自爆】抛出 RuntimeException // Flowable 会捕获此异常 -> 减少重试次数 -> 最终移动到 DeadLetterJob 表 -> 节点变红 String errorInfo = (msg != null && !msg.isEmpty()) ? msg : "外部系统返回失败状态,未提供详细信息"; - throw new RuntimeException("HPC任务执行失败: " + errorInfo); + throw new RuntimeException("异步任务执行失败: " + errorInfo); } // 3. SUCCESS 的情况 @@ -1194,9 +1294,16 @@ import org.springframework.mock.web.MockMultipartFile; import java.io.*; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.sdm.common.entity.req.data.QueryDirReq; +import com.sdm.common.entity.resp.PageDataResp; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; /** * @Description: 生成自动化报告脚本处理器 @@ -1206,6 +1313,17 @@ import java.util.regex.Pattern; @Slf4j @Component("exportWordScript") public class ExportWordScriptHandler implements ExecutionHandler,ExportWordScriptExecuteConfig> { + + /** + * 图片文件后缀正则(忽略大小写) + */ + private static final Pattern IMAGE_FILE_PATTERN = Pattern.compile("(?i).*\\.(png|jpg|jpeg|gif)$"); + + /** + * 查询目录文件的默认分页大小 + */ + private static final int QUERY_DIR_PAGE_SIZE = 1000; + @Autowired private IDataFeignClient dataFeignClient; @@ -1220,92 +1338,242 @@ public class ExportWordScriptHandler implements ExecutionHandler params, ExportWordScriptExecuteConfig config) { + String runId = (String) execution.getVariable("runId"); + String currentNodeId = execution.getCurrentActivityId(); + String processDefinitionId = execution.getProcessDefinitionId(); + try { - // 获取前置节点参数 - String beforeNodeId = config.getBeforeNodeId(); - String currentNodeId =execution.getCurrentActivityId(); - String fileRegularStr = config.getFileRegularStr(); - - // 获取当前流程实例参数 - String runId = (String) execution.getVariable("runId"); - Long userId = (Long) execution.getVariable("userId"); - String userName = (String) execution.getVariable("userName"); - Long tenantId = (Long) execution.getVariable("tenantId"); - ThreadLocalContext.setUserId(userId); - ThreadLocalContext.setUserName(userName); - ThreadLocalContext.setTenantId(tenantId); - String processDefinitionId = execution.getProcessDefinitionId(); - log.info("ExportWordScriptHandler 开始执行 runId:{},userId:{},userName:{},tenantId:{},processDefinitionId:{}, beforeNodeId:{}, currentNodeId:{},fileRegularStr:{}", runId,userId,userName,tenantId,processDefinitionId, beforeNodeId, currentNodeId,fileRegularStr); + // 1. 初始化线程上下文 + initThreadLocalContext(execution); + logExecutionStart(runId, config, currentNodeId, processDefinitionId); + // 2. 构建项目信息请求 ProjecInfoReq projecInfoReq = buildprojectInfoReq(params); log.info("ExportWordScriptHandler的请求参数 projectInfoReq:{}", projecInfoReq); + // 3. 获取性能指标 + List performanceList = fetchPerformanceList(runId); + + // 4. 获取关键结果图片文件ID(优先从前置节点获取,否则从算列结果获取) + String beforeNodeId = config.getBeforeNodeId(); + List keyResultImageFileIds = StringUtils.hasText(beforeNodeId) + ? fetchBeforeNodeImageFileIds(runId, beforeNodeId, processDefinitionId) + : fetchKeyResultImageFileIds(runId); - SdmResponse> runPerformance = simuluationPerformanceFeignClient.getRunPerformance(runId); - if(!runPerformance.isSuccess()){ - log.error("获取算列性能指标失败"); - throw new RuntimeException("获取算列性能指标失败"); - } - List performanceList = new ArrayList<>(); - for (PerformanceResp datum : runPerformance.getData()) { - SimulationPerformance performance = new SimulationPerformance(); - BeanUtils.copyProperties(datum, performance); - performanceList.add(performance); - } - log.info("ExportWordScriptHandler的返回参数 runPerformance:{}", runPerformance); - - SdmResponse> simulationKeyResultFileIds = simulationRunFeignClient.getSimulationKeyResultFileIds(runId); - if(!simulationKeyResultFileIds.isSuccess()){ - log.error("获取算列关键结果文件失败"); - throw new RuntimeException("获取算列关键结果文件失败"); - } - log.info("ExportWordScriptHandler的返回参数 simulationKeyResultFileIds:{}", simulationKeyResultFileIds); - - - ProcessNodeParam currentProcessNodeParam = processNodeParamService.lambdaQuery() - .eq(ProcessNodeParam::getRunId, runId) - .eq(ProcessNodeParam::getNodeId, currentNodeId) - .eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId) - .one(); - - - // 获取当前节点输出文件夹信息 - String currentNodeParamJson = currentProcessNodeParam.getParamJson(); - JSONObject currentParamJsonObject = JSONObject.parseObject(currentNodeParamJson); - Long currentNodeOutputDirId = currentParamJsonObject.getLong("outputDirId"); - FileMetadataInfoResp currentNodeFileMetadataInfoResp = getFileBaseInfo(currentNodeOutputDirId); - String currentNodeObjectKey = currentNodeFileMetadataInfoResp.getObjectKey(); - log.info("当前节点配置参数:{}", currentNodeParamJson); - String currentNodeOutputDirPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + currentNodeObjectKey; - log.info("当前节点输出文件夹:{}", currentNodeOutputDirPath); - - // todo 生成脚本的接口 - SpdmReportReq req = new SpdmReportReq(); - req.setProjecInfoReq(projecInfoReq); - req.setOutPutDirPath(currentNodeOutputDirPath); - req.setImageFileIdList(simulationKeyResultFileIds.getData()); - req.setPerformanceList(performanceList); - SdmResponse voidSdmResponse = simulationRunFeignClient.generateReportInternal(req); - if(!voidSdmResponse.isSuccess()){ - log.error("生成自动化报告失败"); - throw new RuntimeException("生成自动化报告失败"); - } - try { - String reportPath = currentNodeOutputDirPath + "report.docx"; - log.info("报告路径:{}", reportPath); - // 获取临时路径中脚本生成的报告 - uploadResultFileToMinio(currentNodeOutputDirPath + "report.docx",currentNodeOutputDirId); - } catch (Exception ex) { - log.error("生成自动化报告失败:{}", ex.getMessage(), ex); - throw new RuntimeException("生成自动化报告失败"); - } + // 5. 获取输出目录信息 + OutputDirInfo outputDirInfo = resolveOutputDirInfo(runId, currentNodeId, processDefinitionId); + // 6. 生成并上传报告 + generateAndUploadReport(projecInfoReq, performanceList, keyResultImageFileIds, outputDirInfo); + } catch (Exception e) { log.error("执行ExportWordScript失败", e); throw new RuntimeException("执行ExportWordScript失败: " + e.getMessage(), e); } } + /** + * 初始化线程上下文 + */ + private void initThreadLocalContext(DelegateExecution execution) { + Long userId = (Long) execution.getVariable("userId"); + String userName = (String) execution.getVariable("userName"); + Long tenantId = (Long) execution.getVariable("tenantId"); + + ThreadLocalContext.setUserId(userId); + ThreadLocalContext.setUserName(userName); + ThreadLocalContext.setTenantId(tenantId); + } + + /** + * 记录执行开始日志 + */ + private void logExecutionStart(String runId, ExportWordScriptExecuteConfig config, + String currentNodeId, String processDefinitionId) { + Long userId = ThreadLocalContext.getUserId(); + String userName = ThreadLocalContext.getUserName(); + Long tenantId = ThreadLocalContext.getTenantId(); + + log.info("ExportWordScriptHandler 开始执行 runId:{},userId:{},userName:{},tenantId:{}," + + "processDefinitionId:{}, beforeNodeId:{}, currentNodeId:{},fileRegularStr:{}", + runId, userId, userName, tenantId, processDefinitionId, + config.getBeforeNodeId(), currentNodeId, config.getFileRegularStr()); + } + + /** + * 获取性能指标列表 + */ + private List fetchPerformanceList(String runId) { + SdmResponse> response = simuluationPerformanceFeignClient.getRunPerformance(runId); + checkResponse(response, "获取算列性能指标失败"); + + List performanceList = new ArrayList<>(); + for (PerformanceResp datum : response.getData()) { + SimulationPerformance performance = new SimulationPerformance(); + BeanUtils.copyProperties(datum, performance); + performanceList.add(performance); + } + + log.info("ExportWordScriptHandler的返回参数 runPerformance:{}", response); + return performanceList; + } + + /** + * 获取关键结果-图片文件ID列表(从算列结果获取) + */ + private List fetchKeyResultImageFileIds(String runId) { + SdmResponse> response = simulationRunFeignClient.getSimulationKeyResultFileIds(runId); + checkResponse(response, "获取算列关键结果文件失败"); + + log.info("ExportWordScriptHandler的返回参数 simulationKeyResultFileIds:{}", response); + return response.getData(); + } + + /** + * 从前置节点输出目录获取图片文件ID列表 + */ + private List fetchBeforeNodeImageFileIds(String runId, String beforeNodeId, String processDefinitionId) { + // 1. 获取前置节点输出目录ID + Long beforeNodeOutputDirId = resolveNodeOutputDirId(runId, beforeNodeId, processDefinitionId); + if (beforeNodeOutputDirId == null) { + log.warn("前置节点输出目录不存在,beforeNodeId:{}", beforeNodeId); + return Collections.emptyList(); + } + + // 2. 查询目录下所有文件 + List fileList = queryDirFiles(beforeNodeOutputDirId); + if (CollectionUtils.isEmpty(fileList)) { + log.info("前置节点输出目录下无文件,beforeNodeId:{}, dirId:{}", beforeNodeId, beforeNodeOutputDirId); + return Collections.emptyList(); + } + + // 3. 正则过滤图片文件并收集ID + List imageFileIds = fileList.stream() + .filter(file -> file.getOriginalName() != null + && IMAGE_FILE_PATTERN.matcher(file.getOriginalName()).matches()) + .map(FileMetadataInfoResp::getId) + .collect(Collectors.toList()); + + log.info("从前置节点获取到图片文件数量:{}, beforeNodeId:{}", imageFileIds.size(), beforeNodeId); + return imageFileIds; + } + + /** + * 解析节点的输出目录ID + */ + private Long resolveNodeOutputDirId(String runId, String nodeId, String processDefinitionId) { + ProcessNodeParam nodeParam = processNodeParamService.lambdaQuery() + .eq(ProcessNodeParam::getRunId, runId) + .eq(ProcessNodeParam::getNodeId, nodeId) + .eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId) + .one(); + + if (nodeParam == null || nodeParam.getParamJson() == null) { + return null; + } + + JSONObject paramJsonObject = JSONObject.parseObject(nodeParam.getParamJson()); + return paramJsonObject.getLong("outputDirId"); + } + + /** + * 查询目录下的文件列表(只查文件,不含子目录) + */ + private List queryDirFiles(Long dirId) { + QueryDirReq req = new QueryDirReq(); + req.setFileId(dirId); + req.setQueryTarget(2); // 只查文件 + req.setCurrent(1); + req.setSize(QUERY_DIR_PAGE_SIZE); + + SdmResponse>> response = dataFeignClient.queryDir(req); + checkResponse(response, "查询目录文件失败,dirId:" + dirId); + + // 处理空数据情况 + if (response.getData() == null || response.getData().getData() == null) { + return Collections.emptyList(); + } + return response.getData().getData(); + } + + /** + * 解析输出目录信息 + */ + private OutputDirInfo resolveOutputDirInfo(String runId, String currentNodeId, String processDefinitionId) { + ProcessNodeParam nodeParam = processNodeParamService.lambdaQuery() + .eq(ProcessNodeParam::getRunId, runId) + .eq(ProcessNodeParam::getNodeId, currentNodeId) + .eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId) + .one(); + + String paramJson = nodeParam.getParamJson(); + log.info("当前节点配置参数:{}", paramJson); + + JSONObject paramJsonObject = JSONObject.parseObject(paramJson); + Long outputDirId = paramJsonObject.getLong("outputDirId"); + + FileMetadataInfoResp fileMetadata = getFileBaseInfo(outputDirId); + String outputDirPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + fileMetadata.getObjectKey(); + log.info("当前节点输出文件夹:{}", outputDirPath); + + return new OutputDirInfo(outputDirId, outputDirPath); + } + + /** + * 生成并上传报告 + */ + private void generateAndUploadReport(ProjecInfoReq projecInfoReq, + List performanceList, + List keyResultImageFileIds, + OutputDirInfo outputDirInfo) { + // 构建报告请求 + SpdmReportReq req = new SpdmReportReq(); + req.setProjecInfoReq(projecInfoReq); + req.setOutPutDirPath(outputDirInfo.getPath()); + req.setImageFileIdList(keyResultImageFileIds); + req.setPerformanceList(performanceList); + + // 调用生成报告接口 + SdmResponse response = simulationRunFeignClient.generateReportInternal(req); + checkResponse(response, "生成自动化报告失败"); + + // 上传报告文件 + String reportPath = outputDirInfo.getPath() + "report.docx"; + log.info("报告路径:{}", reportPath); + uploadResultFileToMinio(reportPath, outputDirInfo.getDirId()); + } + + /** + * 统一校验远程调用响应 + */ + private void checkResponse(SdmResponse response, String errorMessage) { + if (!response.isSuccess()) { + log.error(errorMessage); + throw new RuntimeException(errorMessage+":"+response.getMessage()); + } + } + + /** + * 输出目录信息内部类 + */ + private static class OutputDirInfo { + private final Long dirId; + private final String path; + + public OutputDirInfo(Long dirId, String path) { + this.dirId = dirId; + this.path = path; + } + + public Long getDirId() { + return dirId; + } + + public String getPath() { + return path; + } + } + private static ProjecInfoReq buildprojectInfoReq(Map params) { ProjecInfoReq projectInfoReq = new ProjecInfoReq(); projectInfoReq.setDepartment((String)params.get("department")); @@ -1460,6 +1728,7 @@ import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.config.FlowableConfig; +import com.sdm.common.config.FlowableVariables; import com.sdm.common.entity.enums.MessageTemplateEnum; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; @@ -1470,6 +1739,7 @@ import com.sdm.common.feign.impl.system.MessageFeignClientImpl; import com.sdm.common.feign.inter.data.IDataFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.log.CoreLogger; +import com.sdm.common.utils.MdcUtil; import com.sdm.flowable.entity.ProcessNodeParam; import com.sdm.flowable.enums.AsyncTaskStatusEnum; import com.sdm.flowable.service.IAsyncTaskRecordService; @@ -1477,6 +1747,7 @@ import com.sdm.flowable.service.IProcessNodeParamService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -1511,33 +1782,54 @@ public class HpcHandler implements ExecutionHandler,HPCExecu * */ @Override public void execute(DelegateExecution execution, Map params, HPCExecuteConfig config) { - CoreLogger.info("hpc process excute,params:{},config:{}",JSONObject.toJSONString(params),JSONObject.toJSONString(config)); + MdcUtil.generateAndPutTraceId(); + log.info("hpc process excute,params:{},config:{}",JSONObject.toJSONString(params),JSONObject.toJSONString(config)); SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq = convertParamsToReq(params); + submitHpcTaskRemoteReq.setParams(params); + // 设置软件id + submitHpcTaskRemoteReq.setSoftwareId(config.getUuid()); String beforeNodeId = config.getBeforeNodeId(); String currentNodeId =execution.getCurrentActivityId(); - String masterFileRegularStr = config.getMasterFileRegularStr(); - String inputFilesRegularStr = config.getInputFilesRegularStr(); - CoreLogger.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr); + String masterFileRegularStr = Objects.isNull(params.get("inputFormat"))?"":params.get("inputFormat").toString(); + String inputFilesRegularStr = Objects.isNull(params.get("slaveFormat"))?"":params.get("slaveFormat").toString(); + String outputFormat = Objects.isNull(params.get("outputFormat"))?"":params.get("outputFormat").toString(); + submitHpcTaskRemoteReq.setInputFormat(masterFileRegularStr); + submitHpcTaskRemoteReq.setSlaveFormat(inputFilesRegularStr); + submitHpcTaskRemoteReq.setOutputFormat(outputFormat); + log.info("hpc executeMode:{}",params.get("executeMode")); + String executeMode = params.get("executeMode").toString(); + if(StringUtils.isBlank(executeMode)|| + (!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&& + !Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL))){ + MdcUtil.removeTraceId(); + throw new RuntimeException("hpc executeMode illegal"); + } + // 自动才判断正则 + if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&& + StringUtils.isBlank(masterFileRegularStr)){ + log.info("hpc process excute,主求解文件规则是空,masterFileRegularStr:{},inputFilesRegularStr:{}", + params.get("inputFormat"),params.get("slaveFormat")); + MdcUtil.removeTraceId(); + throw new RuntimeException("Hpc任务执行失败,主求解文件规则不能是空"); + } + log.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr); + log.info("Hpc执行 processDefinitionId:{},processInstanceId:{}",execution.getProcessDefinitionId(),execution.getProcessInstanceId()); // 初始化用户/租户信息 - initUserInfo(execution); + initUserInfo(execution,params); // params 取只是测试使用 String processDefinitionId = (execution==null||StringUtils.isBlank(execution.getProcessDefinitionId()))? params.get("processDefinitionId").toString():execution.getProcessDefinitionId(); // params 取只是测试使用 String processInstanceId = (execution==null||StringUtils.isBlank(execution.getProcessInstanceId()))? params.get("processInstanceId").toString():execution.getProcessInstanceId(); - + submitHpcTaskRemoteReq.setProcessDefinitionId(processDefinitionId); + submitHpcTaskRemoteReq.setProcessInstanceId(processInstanceId); submitHpcTaskRemoteReq.setMasterFileRegularStr(masterFileRegularStr); submitHpcTaskRemoteReq.setInputFilesRegularStr(inputFilesRegularStr); - CoreLogger.info("hpc executeMode:{}",params.get("executeMode")); - String executeMode = params.get("executeMode").toString(); - if(StringUtils.isBlank(executeMode)|| - (!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&& - !Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL))){ - throw new RuntimeException("hpc executeMode illegal"); - } + // 处理hpc求解文件路径 dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId,executeMode,params); + // // 实现HPC处理逻辑... // INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败) String status = AsyncTaskStatusEnum.INIT.getCode(); @@ -1546,7 +1838,8 @@ public class HpcHandler implements ExecutionHandler,HPCExecu if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ // 推送失败消息 sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"失败"); - log.error("HpcHandler submit failed,jobName:{}",params); + log.error("HpcHandler submit failed:{}",JSONObject.toJSONString(params)); + MdcUtil.removeTraceId(); throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage()); } @@ -1563,7 +1856,7 @@ public class HpcHandler implements ExecutionHandler,HPCExecu status, hpcTaskId ); - + MdcUtil.removeTraceId(); log.info("HPC 任务 {} 已提交", "hpcTaskId"); } @@ -1577,15 +1870,22 @@ public class HpcHandler implements ExecutionHandler,HPCExecu messageFeignClient.sendMessage(req); } - private void initUserInfo(DelegateExecution execution) { + private void initUserInfo(DelegateExecution execution,Map params) { // 获取当前流程实例参数 Long userId = (Long) execution.getVariable("userId"); String userName = (String) execution.getVariable("userName"); Long tenantId = (Long) execution.getVariable("tenantId"); + log.info("Hpc流程发起,用户id:{},租户id:{}",userId,tenantId); + if(Objects.isNull(tenantId)){ + tenantId=Objects.isNull(params.get("tenantId")) ? null : (Long) params.get("tenantId"); + } + if(Objects.isNull(userId)){ + userId=Objects.isNull(params.get("userId")) ? null : (Long) params.get("userId"); + } ThreadLocalContext.setUserId(userId); ThreadLocalContext.setUserName(userName); ThreadLocalContext.setTenantId(tenantId); - CoreLogger.info("hpcHander initUserInfo userId:{},tenantId:{},userName:{}",userId,tenantId,userName); + log.info("hpcHander initUserInfo userId:{},tenantId:{},userName:{}",userId,tenantId,userName); } private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId, @@ -1604,35 +1904,62 @@ public class HpcHandler implements ExecutionHandler,HPCExecu .collect(Collectors.groupingBy(ProcessNodeParam::getNodeId)); List beforeNodeParams = nodeParamMap.get(beforeNodeId); List currentNodeParams = nodeParamMap.get(currentNodeId); - if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){ - throw new RuntimeException("未获取到当前节点或者求解文件节点信息"); + if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)){ + if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){ + MdcUtil.removeTraceId(); + log.warn("未获取到当前节点或者求解文件节点信息"); + throw new RuntimeException("未获取到当前节点或者求解文件节点信息"); + } } + if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)){ + if(CollectionUtils.isEmpty(currentNodeParams)){ + MdcUtil.removeTraceId(); + log.warn("未获取到当前节点信息"); + throw new RuntimeException("未获取到当前节点信息"); + } + } + // 自动,前一个节点 submitHpcTaskRemoteReq.setExecuteMode(executeMode); if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)){ ProcessNodeParam beforeNode = beforeNodeParams.get(0); - String beforeNodeJectKey = getNodeObjectKey(beforeNode); + Pair beforePair = getNodeObjectKey(beforeNode); + String beforeNodeJectKey = beforePair.getLeft(); + if(Objects.isNull(beforePair.getRight())){ + log.error("前一节点文件dirId不能是null"); + MdcUtil.removeTraceId(); + throw new RuntimeException("前一节点文件dirId不能是null"); + } + submitHpcTaskRemoteReq.setInputFileId(beforePair.getRight()); // 本地求解文件路径 taskLocalBaseDir - submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey); - CoreLogger.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey); + String simulationFilePath = simulationBaseDir + beforeNodeJectKey; + submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationFilePath); + log.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey); } // 手动上传的 if (Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)) { List masterFilePaths = getFileListFromMap(params, "masterFileRegularStr"); List inPutFilePaths = getFileListFromMap(params, "inputFilesRegularStr"); - if(CollectionUtils.isEmpty(masterFilePaths)||CollectionUtils.isEmpty(inPutFilePaths)){ - CoreLogger.warn("hpc executeMode manual,filepath illegal"); - throw new RuntimeException("hpc executeMode manual,filepath illegal"); + if(CollectionUtils.isEmpty(masterFilePaths)){ + log.warn("hpc executeMode manual,filepath illegal"); + MdcUtil.removeTraceId(); + throw new RuntimeException("手动模式求解文件不能为空"); } submitHpcTaskRemoteReq.setManualMasterFilepaths(masterFilePaths); submitHpcTaskRemoteReq.setManualInputFilePaths(inPutFilePaths); } // hpc 节点回传路径 ProcessNodeParam currentNode = currentNodeParams.get(0); - String currentNodeJectKey = getNodeObjectKey(currentNode); - // hpc 回传文件路径 + Pair currentNodePair = getNodeObjectKey(currentNode); + String currentNodeJectKey = currentNodePair.getLeft(); + Long curDirId = currentNodePair.getRight(); + // 当前节点的minio文件夹id,用于hpc文件回传,走data服务的接口 + submitHpcTaskRemoteReq.setDirId(curDirId); + // hpc 回传minio文件路径,桶和租户绑定 + submitHpcTaskRemoteReq.setStdoutSpdmMinoFilePath(currentNodeJectKey); + // hpc 回传本地文件路径 submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey); - CoreLogger.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey); + log.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey); } /** @@ -1645,15 +1972,15 @@ public class HpcHandler implements ExecutionHandler,HPCExecu private static List getFileListFromMap(Map dataMap, String key) { return Optional.ofNullable(dataMap) // 提取 explicitInputFiles 子 Map - .map(map -> (Map) map.get("explicitInputFiles")) + .map(map -> (Map) map.get(FlowableVariables.EXPLICIT_INPUT_FILES_KEY)) // 提取指定 key 的列表 .map(explicitMap -> (List) explicitMap.get(key)) // 为空则返回空列表,避免后续遍历 NPE .orElse(List.of()); } - - private String getNodeObjectKey(ProcessNodeParam processNodeParam){ + // String 当前的objectKey Long 节点对应的dirId + private Pair getNodeObjectKey(ProcessNodeParam processNodeParam){ String paramJson = processNodeParam.getParamJson(); JSONObject paramJsonObject = JSONObject.parseObject(paramJson); // outputDirId @@ -1668,13 +1995,14 @@ public class HpcHandler implements ExecutionHandler,HPCExecu } FileMetadataInfoResp fileMetadataInfoResp = fileBaseInfoResp.getData(); String objectKey = fileMetadataInfoResp.getObjectKey(); - return objectKey; + return Pair.of(objectKey,outputDirId); } /** - * 将参数Map转换为SubmitHpcTaskRemoteReq对象的工具方法 + * 将参数Map转换为SubmitHpcTaskRemoteReq对象的工具方法 String command */ private SubmitHpcTaskRemoteReq convertParamsToReq(Map params) { + log.info("convertParamsToReq start "); SubmitHpcTaskRemoteReq req = new SubmitHpcTaskRemoteReq(); if (params == null) { return req; @@ -1684,47 +2012,23 @@ public class HpcHandler implements ExecutionHandler,HPCExecu // 处理int类型字段,包含空值和非数字的异常处理 try { req.setCoreNum(params.get("coreNum") != null ? Integer.parseInt(params.get("coreNum").toString()) : 0); - } catch (NumberFormatException e) { - CoreLogger.error("coreNum parse error:{},coreNum:{}",e.getMessage(),params.get("coreNum")); + } catch (Exception e) { + log.error("coreNum parse error:{},coreNum:{}",e.getMessage(),params.get("coreNum")); req.setCoreNum(0); } req.setSoftware(params.get("software").toString()); req.setJobType(params.get("jobType").toString()); try { req.setIndependence(params.get("independence") != null ? Integer.parseInt(params.get("independence").toString()) : 0); - } catch (NumberFormatException e) { + } catch (Exception e) { + log.warn("get independence error:{}",e.getMessage()); req.setIndependence(0); } req.setTaskId(params.get("taskId").toString()); req.setTaskName(params.get("taskName").toString()); req.setRunId(params.get("runId").toString()); req.setRunName(params.get("runName").toString()); - // mock 时暂时自己传递,后面根据软件名称查询命令 todo 后面从表配置查询 - String command =(params.get("command")==null||StringUtils.isBlank(params.get("command").toString()))? - "\\\\CARSAFE\\share\\solver\\RLithium\\reta.exe -i %s" : params.get("command").toString(); - req.setCommand(command); req.setProjectname(params.get("projectname").toString()); -// req.setFeatchFileType(params.get("featchFileType").toString()); - // req.setBeforeNodeId(params.get("beforeNodeId").toString()); - // 处理commandExpand字段(JSON字符串转Map) - - // 动态命令 -// String commandExpandJson = params.get("commandExpand").toString(); -// if (StringUtils.isNotBlank(commandExpandJson)) { -// try { -// // 将JSON字符串转换为Map -// Map commandExpand = objectMapper.readValue( -// commandExpandJson, -// new TypeReference>() {} -// ); -//// req.setCommandExpand(commandExpand); -// } catch (Exception e) { -// CoreLogger.error("convertParamsToReq error:{},params:{}",e.getMessage(), JSONObject.toJSONString(params)); -// // 如设为null或空Map -//// req.setCommandExpand(new HashMap<>()); -// } -// } - return req; } @@ -1843,18 +2147,77 @@ public class LocalAppHandler implements ExecutionHandler,Loc } + +package com.sdm.flowable.delegate; + +import com.sdm.flowable.enums.AsyncTaskStatusEnum; +import com.sdm.flowable.service.IAsyncTaskRecordService; +import com.sdm.flowable.util.FlowNodeIdUtils; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.UUID; + + +/** + * 本地应用对应的注册节点(RegisterNode) 绑定的 生成异步任务ID 处理器 + * 用于拉起本地应用前,生成异步任务ID = asyncTaskId,并将异步任务ID放入流程变量中 + * 将异步任务ID=asyncTaskId 和 回调节点ID=waitNodeId 保存到 async_task_record 异步任务执行记录表 + * 本地应用执行完成,异步回到时,基于 asyncTaskId 查询异步任务,获取下一个回调节点 waitNodeId,能够恢复流程继续执行 + * + * @author: shimingdm + * @date: 2023/1/31 10:05 + */ +@Slf4j +@Component("localAppRegisterDelegate") +public class LocalAppRegisterDelegate implements JavaDelegate { + + @Autowired + private IAsyncTaskRecordService asyncTaskRecordService; + + @Override + public void execute(DelegateExecution execution) { + // 1. 获取原始 UserTask 的 ID (从 _register 节点ID倒推) + // 当前 activityId 是 "UserTask_A_register" -> 解析出 "UserTask_A" + String currentId = execution.getCurrentActivityId(); + String originalId = FlowNodeIdUtils.parseOriginalNodeId(currentId); + + // 2. 计算接下来的 Wait 节点 ID (用于回调定位) + String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalId); + + // 3. 生成全局唯一的异步任务ID + String asyncTaskId = UUID.randomUUID().toString(); + + log.info("本地应用绑定的执行策略处理:当前节点ID: {}, 原始节点ID: {}, 下一个等待节点ID: {}, 生成异步任务ID: {}", currentId, originalId, waitNodeId, asyncTaskId); + + // 4. 注册到业务表 (状态: RUNNING/INIT) + // handlerType 标记为 LOCAL_APP + asyncTaskRecordService.registerAsyncTask( + execution, + waitNodeId, + "LOCAL_APP", + execution.getVariables(), + AsyncTaskStatusEnum.RUNNING.getCode(), + asyncTaskId + ); + } +} + + package com.sdm.flowable.delegate; import com.alibaba.fastjson2.JSONObject; -import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; +import com.sdm.common.config.FlowableConfig; import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.flowable.AsyncCallbackRequest; import com.sdm.common.entity.resp.data.FileMetadataInfoResp; import com.sdm.common.feign.inter.data.IDataFeignClient; -import com.sdm.common.config.FlowableConfig; import com.sdm.flowable.delegate.handler.ExecutionHandler; import com.sdm.flowable.service.IAsyncTaskRecordService; import com.sdm.flowable.service.IProcessNodeParamService; @@ -1876,8 +2239,9 @@ import java.util.Map; @Component("universalDelegate") @Slf4j public class UniversalDelegate implements JavaDelegate { - @Autowired - private ObjectMapper objectMapper; + +// @Autowired +// private ObjectMapper objectMapper; @Autowired private IProcessNodeParamService processNodeParamService; @@ -1934,7 +2298,8 @@ public class UniversalDelegate implements JavaDelegate { log.info("节点扩展配置, 流程 runId:{},processDefinitionId:{},实例ID: {}, 节点ID: {}, 扩展配置: {}",runId,processDefinitionId, procInstId, nodeId, extensionElement); BaseExecuteConfig config = - objectMapper.readValue(extensionElement, BaseExecuteConfig.class); +// objectMapper.readValue(extensionElement, BaseExecuteConfig.class); + JSONObject.parseObject(extensionElement, BaseExecuteConfig.class); String executeType = config.getExecuteType(); ExecutionHandler handler = handlerMap.get(executeType); @@ -2322,6 +2687,27 @@ public enum AsyncTaskStatusEnum { } + +package com.sdm.flowable.enums; + +/** + * 执行类型枚举 + */ +public enum ExecuteTypeEnum { + CLOUD_APP("cloudApp"), + LOCAL_APP("localApp"), + HPC("HPC"); + + private String code; + ExecuteTypeEnum(String code) { + this.code = code; + } + public String getCode() { + return code; + } +} + + package com.sdm.flowable.enums; @@ -2475,35 +2861,43 @@ public enum ProcessInstanceStateEnum { /** * 运行中 */ - RUNNING("running"), + RUNNING(1,"running"), + + /** + * 已完成 + */ + COMPLETED(2,"completed"), + + /** + * 错误 + */ + ERROR(3,"error"), /** * 挂起 */ - SUSPENDED("suspended"), - - /** - * 错误 - */ - ERROR("error"), - - /** - * 已完成 - */ - COMPLETED("completed"), + SUSPENDED(4,"suspended"), + /** * 已取消 */ - CANCELLED("cancelled"); + CANCELLED(5,"cancelled"); - private final String code; + private final Integer code; - ProcessInstanceStateEnum(String code) { + private final String value; + + ProcessInstanceStateEnum(Integer code, String value) { this.code = code; + this.value = value; } - public String getCode() { + public String getValue() { + return value; + } + + public Integer getCode() { return code; } } @@ -2596,6 +2990,233 @@ public class FlowableApplication { } + +package com.sdm.flowable.listener; + +import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.feign.inter.project.ISimulationRunFeignClient; +import com.sdm.flowable.enums.ProcessInstanceStateEnum; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEventListener; +import org.flowable.engine.RuntimeService; +import org.flowable.engine.impl.persistence.entity.ExecutionEntity; +import org.flowable.job.api.Job; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * 全局算例状态同步监听器 + * 作用:替代定时任务,实时推送状态 + *

+ * 监听Flowable流程引擎的全生命周期事件,实现算例状态的实时同步: + * - PROCESS_COMPLETED: 流程正常完成 + * - PROCESS_CANCELLED: 流程被取消/终止 + * - ENTITY_SUSPENDED: 实体挂起(需过滤流程实例) + * - ENTITY_ACTIVATED: 实体激活(需过滤流程实例) + * - JOB_MOVED_TO_DEADLETTER: 作业移入死信队列(ERROR状态) + *

+ * + * @author SDM + * @date 2026-01-23 + */ +@Slf4j +@Component +public class GlobalStatusEventListener implements FlowableEventListener { + + @Autowired + private ISimulationRunFeignClient simulationRunFeignClient; + + @Autowired + @Lazy + private RuntimeService runtimeService; + + @Override + public Set getTypes() { + return new HashSet<>(Arrays.asList( + FlowableEngineEventType.PROCESS_COMPLETED, // 流程完成 + FlowableEngineEventType.PROCESS_CANCELLED, // 流程取消(精确匹配,不需要判断deleteReason) + FlowableEngineEventType.ENTITY_SUSPENDED, // 实体挂起(需过滤流程实例) + FlowableEngineEventType.ENTITY_ACTIVATED, // 实体激活(需过滤流程实例) + FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER // 作业进入死信(ERROR状态) + )); + } + + @Override + public void onEvent(FlowableEvent event) { + try { + FlowableEngineEventType eventType = (FlowableEngineEventType) event.getType(); + + // 1. 流程正常完成 + if (eventType == FlowableEngineEventType.PROCESS_COMPLETED) { + handleProcessCompleted((FlowableEngineEntityEvent) event); + } + // 2. 流程被取消(Flowable 7.x有独立的CANCELLED事件) + else if (eventType == FlowableEngineEventType.PROCESS_CANCELLED) { + handleProcessCancelled((FlowableEngineEntityEvent) event); + } + // 3. 实体挂起/激活(需要过滤,只处理流程实例级别) + else if (eventType == FlowableEngineEventType.ENTITY_SUSPENDED || + eventType == FlowableEngineEventType.ENTITY_ACTIVATED) { + handleSuspendOrActivate((FlowableEngineEntityEvent) event, eventType); + } + // 4. 作业进入死信队列(ERROR状态的金标准) + else if (eventType == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) { + handleDeadLetter((FlowableEngineEntityEvent) event); + } + } catch (Exception e) { + log.error("处理Flowable事件异常: eventType={}", event.getType(), e); + } + } + + // --- 内部逻辑方法 --- + + /** + * 处理流程正常完成事件 + * PROCESS_COMPLETED只表示流程走到EndEvent,不包括取消场景 + */ + private void handleProcessCompleted(FlowableEngineEntityEvent event) { + String processInstanceId = event.getProcessInstanceId(); + ExecutionEntity execution = (ExecutionEntity) event.getEntity(); + Long userId = (Long) execution.getVariable("userId"); + Long tenantId = (Long) execution.getVariable("tenantId"); + log.info("流程正常完成: processInstanceId={}, userId={}, tenantId={}", processInstanceId, userId, tenantId); + doUpdate(processInstanceId, ProcessInstanceStateEnum.COMPLETED.getCode(), userId, tenantId); + } + + /** + * 处理流程取消事件 + * Flowable 7.x提供了独立的PROCESS_CANCELLED事件,更精确 + */ + private void handleProcessCancelled(FlowableEngineEntityEvent event) { + String processInstanceId = event.getProcessInstanceId(); + ExecutionEntity execution = (ExecutionEntity) event.getEntity(); + Long userId = (Long) execution.getVariable("userId"); + Long tenantId = (Long) execution.getVariable("tenantId"); + log.info("流程被取消: processInstanceId={}, userId={}, tenantId={}, deleteReason={}", + processInstanceId, userId, tenantId, execution.getDeleteReason()); + doUpdate(processInstanceId, ProcessInstanceStateEnum.CANCELLED.getCode(), userId, tenantId); + } + + /** + * 处理实体挂起/激活事件 + * 关键:必须过滤,只处理流程实例级别的挂起/激活,忽略子流程或其他实体 + */ + private void handleSuspendOrActivate(FlowableEngineEntityEvent event, FlowableEngineEventType eventType) { + Object entity = event.getEntity(); + + // 只处理ExecutionEntity(流程执行实体) + if (entity instanceof ExecutionEntity) { + ExecutionEntity execution = (ExecutionEntity) entity; + + // 关键判断:isProcessInstanceType()确保是流程实例本身,而非子分支 + if (execution.isProcessInstanceType()) { + String processInstanceId = execution.getProcessInstanceId(); + Integer status = (eventType == FlowableEngineEventType.ENTITY_SUSPENDED) ? ProcessInstanceStateEnum.SUSPENDED.getCode() : ProcessInstanceStateEnum.RUNNING.getCode(); + Long userId = (Long) execution.getVariable("userId"); + Long tenantId = (Long) execution.getVariable("tenantId"); + + log.info("流程实例{}状态变更: processInstanceId={}, userId={}, tenantId={}", + (eventType == FlowableEngineEventType.ENTITY_SUSPENDED) ? "挂起" : "激活", processInstanceId, userId, tenantId); + doUpdate(processInstanceId, status, userId, tenantId); + } else { + log.debug("忽略非流程实例级别的挂起/激活事件: executionId={}", execution.getId()); + } + } + } + + /** + * 处理死信事件(ERROR状态的标准方式) + * JOB_MOVED_TO_DEADLETTER是最精确的ERROR信号: + * - 比JOB_EXECUTION_FAILURE准确(FAILURE只是重试中的失败) + * - 表示引擎已放弃重试,必须人工干预 + *

+ * 前提条件:ServiceTask必须配置async="true"和R0/PT0S重试策略 + */ + private void handleDeadLetter(FlowableEngineEntityEvent event) { + Object entity = event.getEntity(); + + if (entity instanceof Job) { + Job job = (Job) entity; + String processInstanceId = job.getProcessInstanceId(); + String exceptionMessage = job.getExceptionMessage(); + + // 对于Job事件,需要通过RuntimeService查询流程变量 + Long userId = null; + Long tenantId = null; + if (processInstanceId != null) { + Map variables = runtimeService.getVariables(processInstanceId); + userId = (Long) variables.get("userId"); + tenantId = (Long) variables.get("tenantId"); + } + + log.error("❌ 作业进入死信队列,流程ERROR: processInstanceId={}, userId={}, tenantId={}, jobId={}, exception={}", + processInstanceId, userId, tenantId, job.getId(), exceptionMessage); + + doUpdate(processInstanceId, ProcessInstanceStateEnum.ERROR.getCode(), userId, tenantId); + } + } + + /** + * 真正的更新数据库逻辑 + * 通过Feign调用project服务更新算例状态 + */ + private void doUpdate(String processInstanceId, Integer statusCode, Long userId, Long tenantId) { + if (processInstanceId == null) { + log.warn("流程实例ID为空,跳过状态更新"); + return; + } + + log.info(">>> 更新算例状态 [{}] -> {}", processInstanceId, statusCode); + + // 设置线程上下文,确保Feign调用能正确传递用户和租户信息 + try { + if (userId != null) { + ThreadLocalContext.setUserId(userId); + } + if (tenantId != null) { + ThreadLocalContext.setTenantId(tenantId); + } + + simulationRunFeignClient.updateStatusByProcessInstanceId(processInstanceId, statusCode); + } catch (Exception e) { + log.error("更新算例状态失败: processInstanceId={}, userId={}, tenantId={}, status={}", + processInstanceId, userId, tenantId, statusCode, e); + } finally { + // 清理线程上下文,防止线程池污染 + ThreadLocalContext.clear(); + } + } + + @Override + public boolean isFailOnException() { + // 返回false:即使监听器抛异常,也不影响流程继续执行 + // 这是容错设计,保证业务流程的稳定性 + return false; + } + + @Override + public boolean isFireOnTransactionLifecycleEvent() { + // 返回false:在事务提交前触发,确保状态及时更新 + // 如果返回true,监听器会在事务提交后触发,可能导致状态更新滞后 + return false; + } + + @Override + public String getOnTransaction() { + return null; + } +} + + package com.sdm.flowable.listener; @@ -2779,9 +3400,17 @@ public class UserTaskDirectoryPreparationListener implements ExecutionListener { String nodeId = execution.getCurrentActivityId(); String processDefinitionId = execution.getProcessDefinitionId(); + // 【新增逻辑】处理本地应用注册节点的 ID 映射 + // 如果监听器运行在 Register 节点(例如 "Task_1_register"),我们需要还原为原始 ID ("Task_1") + String originalNodeId = FlowNodeIdUtils.parseOriginalNodeId(nodeId); + if (!nodeId.equals(originalNodeId)) { + log.info("检测到辅助节点,ID 还原: {} -> {}", nodeId, originalNodeId); + nodeId = originalNodeId; + } + //创建本地文件夹,用于后续节点计算直接从本地读取,不需要再从minio中获取数据 - JSONObject params =processNodeParamService.getParam(processDefinitionId,nodeId,runId); - log.info("userTaskDirectoryPreparationListener, 启动流程 runId:{},nodeId:{},实例id: {},参数 params:{}", runId,nodeId,execution.getProcessInstanceId(),params); + JSONObject params = processNodeParamService.getParam(processDefinitionId, nodeId, runId); + log.info("userTaskDirectoryPreparationListener, 启动流程 runId:{},nodeId:{},实例id: {},参数 params:{}", runId, nodeId, execution.getProcessInstanceId(), params); Long currentNodeOutputDirId = params.getLong("outputDirId"); if(ObjectUtils.isEmpty(currentNodeOutputDirId)){ throw new RuntimeException("当前节点未配置输出文件夹"); @@ -2855,10 +3484,12 @@ public interface Iprocess { package com.sdm.flowable.process; import com.alibaba.fastjson2.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.flowable.dto.NodeDetailInfo; import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO; import com.sdm.common.entity.flowable.dto.ProcessInstanceInfo; +import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.flowable.AsyncCallbackRequest; import com.sdm.common.entity.resp.data.FileMetadataInfoResp; @@ -2870,6 +3501,8 @@ import com.sdm.flowable.delegate.UniversalDelegate; import com.sdm.flowable.dto.req.CompleteTaskReq; import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq; import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp; +import com.sdm.flowable.entity.AsyncTaskRecord; +import com.sdm.flowable.enums.ExecuteTypeEnum; import com.sdm.flowable.enums.FlowElementTypeEnums; import com.sdm.flowable.enums.NodeStateEnum; import com.sdm.flowable.enums.ProcessInstanceStateEnum; @@ -2900,6 +3533,7 @@ import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.RequestBody; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -2938,6 +3572,10 @@ public class ProcessService implements Iprocess{ @Autowired private IDataFeignClient dataFeignClient; + @Autowired + private ObjectMapper objectMapper; + + // 部署流程(前端传入Flowable标准JSON) public SdmResponse deploy(ProcessDefinitionDTO processDTO) throws Exception { log.info("开始部署流程定义: {}",processDTO); @@ -3158,6 +3796,8 @@ public class ProcessService implements Iprocess{ boolean isSuspended; boolean hasDeadLetterJobs; + String processInstanceId; + // 记录最早发生错误的时间(死信作业的创建时间) Date earliestErrorTime; @@ -3187,6 +3827,7 @@ public class ProcessService implements Iprocess{ */ private ProcessStateContext prepareStateContext(String processInstanceId) { ProcessStateContext ctx = new ProcessStateContext(); + ctx.setProcessInstanceId(processInstanceId); // 1. 获取运行时流程实例对象(判断是否运行中、是否挂起) // ACT_RU_EXECUTION (运行时执行实例表) @@ -3315,11 +3956,11 @@ public class ProcessService implements Iprocess{ if (isRunning) { // --- 运行中 --- if (hasError) { - status = ProcessInstanceStateEnum.ERROR.getCode(); // 有死信作业,视为异常 + status = ProcessInstanceStateEnum.ERROR.getValue(); // 有死信作业,视为异常 } else if (isSuspended) { - status = ProcessInstanceStateEnum.SUSPENDED.getCode(); // 被挂起 + status = ProcessInstanceStateEnum.SUSPENDED.getValue(); // 被挂起 } else { - status = ProcessInstanceStateEnum.RUNNING.getCode(); // 正常运行 + status = ProcessInstanceStateEnum.RUNNING.getValue(); // 正常运行 } } else { // --- 已结束 (运行时查不到,历史表里有) --- @@ -3327,11 +3968,11 @@ public class ProcessService implements Iprocess{ if (deleteReason == null) { // 1. 正常走完结束节点,deleteReason 为空 - status = ProcessInstanceStateEnum.COMPLETED.getCode(); + status = ProcessInstanceStateEnum.COMPLETED.getValue(); } else { // 2. 有删除原因,说明是被取消或强制终止的 // 你可以根据 reason 的内容做更细的区分,或者统称为 cancelled - status = ProcessInstanceStateEnum.CANCELLED.getCode(); + status = ProcessInstanceStateEnum.CANCELLED.getValue(); } } info.setStatus(status); @@ -3352,7 +3993,8 @@ public class ProcessService implements Iprocess{ if (isOriginal) { // === 原始节点:聚合逻辑 === - // 它需要根据 "waitUser -> self -> wait -> check" 整个链条来决定状态 + // serviceTask 它需要根据 "waitUser -> self -> wait -> check" 整个链条来决定状态 + // userTask 它需要根据 "Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask)" 链条来决定状态 determineUnifiedState(node, ctx); } else { // === 辅助节点:物理逻辑 === @@ -3366,6 +4008,81 @@ public class ProcessService implements Iprocess{ * 逻辑 A:原始节点状态计算 (聚合所有关联节点) */ private void determineUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { + if(isLocalApp(node)){ + // 针对本地应用节点的特殊聚合逻辑 + determineUserTaskUnifiedState(node, ctx); + }else { + // 针对serviceTask的任务节点的聚合状态展示处理 + determineServiceTaskUnifiedState(node, ctx); + } + + } + + /** + * 根据 "Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask)" 链条来决定状态 + * @param node + * @param ctx + */ + private void determineUserTaskUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { + String origId = node.getId(); + String registerId = FlowNodeIdUtils.generateRegisterTaskId(origId); + String waitId = FlowNodeIdUtils.generateAsyncTaskId(origId); + String checkId = FlowNodeIdUtils.generateCheckTaskId(origId); + String procInstId = ctx.getProcessInstanceId(); // 或者作为参数传入 + + AsyncTaskRecord record = asyncTaskRecordService.lambdaQuery() + .eq(AsyncTaskRecord::getProcessInstanceId, procInstId) + .eq(AsyncTaskRecord::getReceiveTaskId, waitId) + .last("LIMIT 1") + .one(); + + if (record != null) { + node.setAsyncTaskId(record.getAsyncTaskId()); + } + + // 1. 判断 ERROR (Check 节点死信) + if (ctx.errorMap.containsKey(checkId)) { + node.setStatus(NodeStateEnum.ERROR.getCode()); + node.setErrorMessage(ctx.errorMap.get(checkId)); + // 计算时间:从 register 开始,到当前时刻 + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 2. 判断 WAITING_FOR_USER (停在 Original UserTask) + // 这代表前置的自动化流程跑完了,且成功了,正在等用户点下一步 + if (ctx.activeActivityIds.contains(origId)) { + node.setStatus(NodeStateEnum.WAITING_FOR_USER.getCode()); + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 3. 判断 ACTIVE (停在 Register / Wait / Check) + // 代表自动化流程正在跑 (注册中、应用运行中、校验中) + boolean isActive = ctx.activeActivityIds.contains(registerId) || + ctx.activeActivityIds.contains(waitId) || + ctx.activeActivityIds.contains(checkId); + + if (isActive) { + // 如果流程挂起,优先显示挂起 + node.setStatus(ctx.isSuspended() ? NodeStateEnum.SUSPENDED.getCode() : NodeStateEnum.ACTIVE.getCode()); + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 4. 判断 FINISHED (Original 节点已结束) + HistoricActivityInstance lastHist = ctx.historyMap.get(origId); + if (lastHist != null && lastHist.getEndTime() != null) { + node.setStatus(NodeStateEnum.FINISHED.getCode()); + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 5. PENDING + node.setStatus(NodeStateEnum.PENDING.getCode()); + } + + private void determineServiceTaskUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { String origId = node.getId(); String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(origId); String waitId = FlowNodeIdUtils.generateAsyncTaskId(origId); @@ -3428,6 +4145,24 @@ public class ProcessService implements Iprocess{ node.setStatus(NodeStateEnum.PENDING.getCode()); } + + // 判断是否为本地应用节点 + private boolean isLocalApp(NodeDetailInfo node) { + String executeConfigJson = node.getExecuteConfig(); + if (executeConfigJson == null || executeConfigJson.isBlank()) { + return false; + } + + try { + BaseExecuteConfig config = + objectMapper.readValue(executeConfigJson, BaseExecuteConfig.class); + return ExecuteTypeEnum.LOCAL_APP.getCode().equalsIgnoreCase(config.getExecuteType()); + } catch (Exception e) { + log.warn("解析 executeConfig 失败,nodeId={}, json={}", + node.getId(), executeConfigJson, e); + return false; + } + } /** * 逻辑 B:辅助节点状态计算 (物理状态) */ @@ -3487,6 +4222,45 @@ public class ProcessService implements Iprocess{ calculateDuration(node, startTime, endTime); } + /** + * 计算本地应用聚合节点的时间 + * 逻辑: + * StartTime = _register 节点的开始时间 + * EndTime = 只有当状态是 FINISHED 时,取 Original 节点的结束时间,否则为 null + * Duration = EndTime (或 Now) - StartTime + * + * @param node 节点信息对象 + * @param ctx 状态上下文 + * @param registerId 链条起点的ID + * @param origId 链条终点的ID + */ + private void calculateLocalAppTime(NodeDetailInfo node, ProcessStateContext ctx, String registerId, String origId) { + // 1. 获取开始时间 (取 _register 的历史记录) + HistoricActivityInstance startInst = ctx.historyMap.get(registerId); + + // 防御性编程:如果 _register 没查到(极少见),尝试查 origId 补救 + if (startInst == null) { + startInst = ctx.historyMap.get(origId); + } + + Date startTime = (startInst != null) ? startInst.getStartTime() : null; + node.setStartTime(startTime); + + // 2. 获取结束时间 (仅当整个节点 FINISHED 时才有值) + Date endTime = null; + if (NodeStateEnum.FINISHED.getCode().equals(node.getStatus())) { + // 取终点 Original (UserTask) 的历史记录 + HistoricActivityInstance endInst = ctx.historyMap.get(origId); + if (endInst != null) { + endTime = endInst.getEndTime(); + } + } + node.setEndTime(endTime); + + // 3. 计算耗时 + calculateDuration(node, startTime, endTime); + } + /** * 工具:设置单个节点的物理时间 */ @@ -3611,9 +4385,13 @@ public class ProcessService implements Iprocess{ if (req.getRegexConfig() != null) { req.getRegexConfig().forEach((k, v) -> { try { - patternMap.put(k, Pattern.compile(v)); + String decodedRegex = new String( + Base64.getDecoder().decode(v), + StandardCharsets.UTF_8 + ); + patternMap.put(k, Pattern.compile(decodedRegex)); } catch (Exception e) { - log.error("正则编译失败: key={}, regex={}", k, v); + log.error("正则解码或编译失败: key={}, regexBase64={}", k, v, e); } }); } @@ -3855,9 +4633,7 @@ public class ProcessService implements Iprocess{ public SdmResponse retryFailedNode( String processInstanceId,String failNodeId) { try { - // 2. 查找 Job ID (参考上面的代码) String jobId = findDeadJobId(processInstanceId, failNodeId); - // 3. 执行重试 managementService.moveDeadLetterJobToExecutableJob(jobId, 1); log.info("作业已恢复,等待异步执行器拾取执行..."); return SdmResponse.success("重试任务已提交"); @@ -3870,33 +4646,70 @@ public class ProcessService implements Iprocess{ /** * 任意节点跳转重试 (Rewind/Jump) * 场景:节点失败(进死信)后,用户修改参数,跳转回任意前置节点重新跑。 + * 支持智能修正目标节点:如果是本地应用(Local App)链路,自动跳转回注册节点。 * * @param procInstId 流程实例ID * @param targetNodeId 目标逻辑节点ID (用户想跳去哪里,如 "TaskA") * @param newVariables 新的参数 */ public SdmResponse retryToNode(String procInstId, String targetNodeId, Map newVariables) { - log.info("开始执行回退重试 (Rewind), 流程: {}, 目标: {}", procInstId, targetNodeId); + log.info("开始执行回退重试 (Rewind), 流程: {}, 原始目标: {}", procInstId, targetNodeId); + + // 0. 智能修正目标节点 (针对 Local App 链路) + // 获取流程定义模型 + ProcessInstance processInstance = runtimeService.createProcessInstanceQuery() + .processInstanceId(procInstId) + .singleResult(); + if (processInstance == null) { + return SdmResponse.failed("流程实例不存在或已结束"); + } + + BpmnModel bpmnModel = repositoryService.getBpmnModel(processInstance.getProcessDefinitionId()); + if (bpmnModel != null) { + // 尝试解析原始节点ID (去除 _check, _wait 等后缀) + String originalNodeId = FlowNodeIdUtils.parseOriginalNodeId(targetNodeId); + // 推测是否存在对应的注册节点 (Local App 特征) + String registerNodeId = FlowNodeIdUtils.generateRegisterTaskId(originalNodeId); + + // 如果注册节点存在,说明这是 Local App 链路,且必须从注册节点重新开始 + // 避免直接跳到中间状态 (如 check 或 wait),导致本地应用没拉起来 + if (bpmnModel.getMainProcess().getFlowElement(registerNodeId) != null) { + if (!registerNodeId.equals(targetNodeId)) { + log.info("检测到 Local App 链路,自动修正重试目标: {} -> {}", targetNodeId, registerNodeId); + targetNodeId = registerNodeId; + } + } + } // 1. 获取当前流程实例中 **所有** 活跃/停滞的节点 // 包含:正在运行的、报错进死信的、UserTask等待中的 // 为什么要全拿?防止并行分支回退时产生幽灵分支。 List allActiveActivityIds = getFailedActivityIds(procInstId); + // 如果没有找到死信节点,尝试获取当前活跃节点 (可能是 UserTask 等待中,或者正常的运行中节点) + if (allActiveActivityIds.isEmpty()) { + allActiveActivityIds = runtimeService.createExecutionQuery() + .processInstanceId(procInstId) + .list() + .stream() + .map(Execution::getActivityId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + } + if (allActiveActivityIds.isEmpty()) { return SdmResponse.failed("当前流程已结束或状态异常,无法执行跳转"); } - log.info("当前活跃节点集合: {},将全部收束至目标: {}", allActiveActivityIds, targetNodeId); + log.info("当前活跃/失败节点集合: {},将全部收束至目标: {}", allActiveActivityIds, targetNodeId); // 2. 更新流程变量 if (newVariables != null && !newVariables.isEmpty()) { runtimeService.setVariables(procInstId, newVariables); } - // 3. 净室清理 (已移除,由 Delegate/Listener 自动处理) - - // 4. 执行全量跳转 + // 3. 执行全量跳转 try { runtimeService.createChangeActivityStateBuilder() .processInstanceId(procInstId) @@ -4009,8 +4822,8 @@ import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_MSG; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_STATUS; /** *

@@ -4441,6 +5254,8 @@ import org.springframework.stereotype.Component; import java.util.*; import java.util.stream.Collectors; +import static com.sdm.flowable.enums.ExecuteTypeEnum.LOCAL_APP; + /** * DTO → Flowable BpmnModel 映射工具类(核心) */ @@ -4477,6 +5292,9 @@ public class Dto2BpmnConverter { // 3.1、存储等待用户输入任务映射关系(原节点ID → waitUserTask节点ID) // 这里 Value 存的是 _waitUser 节点 ID,用于标识这个节点开启了等待用户操作 Map waitUserTaskMap = new HashMap<>(); // 原节点ID → waitUserTask节点ID + // 存储本地应用 UserTask 的映射关系 (OriginalId -> RegisterId) + // 用于将指向 UserTask 的连线重定向到 Register 节点 + Map localAppMap = new HashMap<>(); // 4. 存储并行网关映射关系(原节点ID → 网关ID) @@ -4490,7 +5308,7 @@ public class Dto2BpmnConverter { // 处理等待用户提交任务 handleWaitUserTask(process, nodeDto, waitUserTaskMap); // 创建实际节点 - createActualNode(process, nodeDto, asyncTaskMap); + createActualNode(process, nodeDto, asyncTaskMap,localAppMap); // 处理并行网关,创建拆分和汇聚节点 addRequiredGateways(process, nodeDto, flowDtos, joinGatewayMap, splitGatewayMap); } @@ -4499,7 +5317,7 @@ public class Dto2BpmnConverter { addRetryTask(process); // 6. 创建连线 - createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap); + createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap,localAppMap, joinGatewayMap, splitGatewayMap); validProcess(process); @@ -4586,8 +5404,7 @@ public class Dto2BpmnConverter { } private boolean isAsyncCallbackEnabled(FlowElementDTO nodeDto) { - return (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) || - FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) && + return FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) && nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null && nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback(); @@ -4601,7 +5418,7 @@ public class Dto2BpmnConverter { } private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map waitUserTaskMap) { - // 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask + // 用来设置节点是否人工执行: 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType())) { String originalNodeId = nodeDto.getId(); String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId); @@ -4667,6 +5484,7 @@ public class Dto2BpmnConverter { List flowDtos, Map asyncTaskMap, Map waitUserTaskMap, + Map localAppMap, Map joinGatewayMap, Map splitGatewayMap) { @@ -4691,6 +5509,11 @@ public class Dto2BpmnConverter { // (Prev -> WaitUser -> Original) // ==================================================================================== handleWaitUserTaskConnections(process, waitUserTaskMap); + + // ==================================================================================== + // 处理 LocalApp UserTask 的连线重定向 + // 逻辑:Prev -> Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask) + handleLocalAppConnections(process, localAppMap); } /** @@ -4778,7 +5601,7 @@ public class Dto2BpmnConverter { /** * 第三阶段:处理异步任务连接 - * 针对已标记为异步回调的任务节点,将其连接重构为' Original -> Wait -> Check -> Targets(NextNodes)'的模式 + * 针对已标记为异步回调的任务节点,将其连接 Original -> Targets(NextNodes) 重构为' Original -> Wait -> Check -> Targets(NextNodes)'的模式 */ private void handleAsyncTaskConnections(Process process, Map asyncTaskMap) { for (String originalNodeId : asyncTaskMap.keySet()) { @@ -4815,6 +5638,12 @@ public class Dto2BpmnConverter { } } + /** + * 第四阶段:处理等待用户任务连接 + * 重构等待用户任务的连线,将 Prev -> Original 处理成 Prev -> WaitUser -> Original + * @param process + * @param waitUserTaskMap + */ private void handleWaitUserTaskConnections(Process process, Map waitUserTaskMap) { for (String originalNodeId : waitUserTaskMap.keySet()) { String waitUserId = waitUserTaskMap.get(originalNodeId); @@ -4833,21 +5662,65 @@ public class Dto2BpmnConverter { } removeLines.forEach(f -> process.removeFlowElement(f.getId())); - // Step 2: 添加原来的入线 → waitUserTask + // Step 2: 添加新的入线 Prev → waitUserTask for (String src : originalSources) { process.addFlowElement(createSequenceFlow(src, waitUserId, null)); } - // Step 3: waitUserTask → 原节点 + // Step 3: 添加新的出线 waitUserTask → Original 原节点 process.addFlowElement(createSequenceFlow(waitUserId, originalNodeId, null)); } } + /** + * 第五阶段:处理本地应用任务连接 + * 前置生成节点处理中已经生成了 Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)的连线 + * 针对已标记为本地应用的任务节点,将其连接 prevNode--->Original Node 重构为'prevNode ---> Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask)'的模式 + * @param process + * @param localAppMap + */ + private void handleLocalAppConnections(Process process, Map localAppMap) { + for (String originalId : localAppMap.keySet()) { + String registerId = localAppMap.get(originalId); + // 通过工具类获取 Check 节点 ID + String checkId = FlowNodeIdUtils.generateCheckTaskId(originalId); + + // ============================================================== + // 步骤 A: 处理入线 (prevNode -> Register) + // ============================================================== + + // 1. 找出所有指向 Original 的线 (此时全是来自上游的线) + List incomingFlows = new ArrayList<>(); + for (FlowElement ele : process.getFlowElements()) { + if (ele instanceof SequenceFlow) { + SequenceFlow sf = (SequenceFlow) ele; + if (sf.getTargetRef().equals(originalId)) { + // 【优化】这里不再需要判断 sourceRef 是否为 checkId 了 + // 因为我们在 createLocalAppUserTaskChain 里没画那条线 + incomingFlows.add(sf); + } + } + } + + // 2. 将这些线的终点,从 Original 改为 Register + for (SequenceFlow sf : incomingFlows) { + sf.setTargetRef(registerId); + } + + // ============================================================== + // 步骤 B: 缝合内部与原始节点 (Check -> Original) + // ============================================================== + // 3. 手动补充最后这一跳 + SequenceFlow checkToOriginal = createSequenceFlow(checkId, originalId, null); + process.addFlowElement(checkToOriginal); + } + } + /** * 创建实际的流程节点 */ - private void createActualNode(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) throws JsonProcessingException { + private void createActualNode(Process process, FlowElementDTO nodeDto, Map asyncTaskMap,Map localAppMap) throws JsonProcessingException { FlowElementTypeEnums elementType = FlowElementTypeEnums.fromString(nodeDto.getType()); switch (elementType) { @@ -4881,7 +5754,7 @@ public class Dto2BpmnConverter { break; case USERTASK: - createUserTask(process, nodeDto, asyncTaskMap); + createUserTask(process, nodeDto,localAppMap); break; case SERVICETASK: @@ -4894,21 +5767,90 @@ public class Dto2BpmnConverter { } } - private void createUserTask(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) throws JsonProcessingException { - // 用户任务:映射为 Flowable UserTask + private void createUserTask(Process process, FlowElementDTO nodeDto,Map localAppMap) throws JsonProcessingException { + //判断是否为本地应用类型 + if (FlowNodeIdUtils.isLocalAppNode(nodeDto)) { + createLocalAppUserTaskChain(process, nodeDto,localAppMap); + } else { + // 原有的普通 UserTask 创建逻辑 + createNormalUserTask(process, nodeDto); + } + } + + private void createLocalAppUserTaskChain(Process process, FlowElementDTO nodeDto,Map localAppMap) throws JsonProcessingException { + String originalId = nodeDto.getId(); + String registerId = FlowNodeIdUtils.generateRegisterTaskId(originalId); // _register + String waitId = FlowNodeIdUtils.generateAsyncTaskId(originalId); // _wait + String checkId = FlowNodeIdUtils.generateCheckTaskId(originalId); // _check + + // 1. Register Node (ServiceTask) + ServiceTask registerTask = new ServiceTask(); + registerTask.setId(registerId); + registerTask.setName("注册本地任务"); + registerTask.setAsynchronous(true); + disableAsyncRetry(registerTask); + registerTask.setImplementation("${localAppRegisterDelegate}"); + registerTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); + + // 【新增逻辑】为 Register 节点添加目录准备监听器 + // 这样在进入组合节点的第一步时,就会执行文件夹清空/准备 + addDirectoryPreparationListener(registerTask); + + process.addFlowElement(registerTask); + + // 2. Wait Node (ReceiveTask) + ReceiveTask waitTask = new ReceiveTask(); + waitTask.setId(waitId); + waitTask.setName("等待应用回调"); + waitTask.setAsynchronous(true); + disableAsyncRetry(waitTask); + process.addFlowElement(waitTask); + + // 3. Check Node (ServiceTask) - 哨兵 + ServiceTask checkTask = new ServiceTask(); + checkTask.setId(checkId); + checkTask.setName("结果校验"); + checkTask.setAsynchronous(true); + disableAsyncRetry(checkTask); // 失败变红,进入死信 + checkTask.setImplementation("${asyncResultCheckDelegate}"); // 复用现有校验 + checkTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); + process.addFlowElement(checkTask); + + // 4. Original Node (UserTask) UserTask userTask = new UserTask(); - // 【关键修改】设置异步:防止下方绑定的监听器(创建文件夹)报错导致前一个节点回滚 - // 这会创建一个 Job 来执行 UserTask 的初始化逻辑(包括执行监听器) + userTask.setId(originalId); + userTask.setName(nodeDto.getName()); userTask.setAsynchronous(true); disableAsyncRetry(userTask); + // 保留原有的 UserTask 配置 (如目录准备监听器等) + addUserTaskExtensions(userTask, nodeDto); + process.addFlowElement(userTask); + + // 5. 建立内部连线: Register -> Wait -> Check -> Original + process.addFlowElement(createSequenceFlow(registerId, waitId, null)); + process.addFlowElement(createSequenceFlow(waitId, checkId, null)); + + // 6. 记录映射,用于后续将外部入线重定向到 Register + localAppMap.put(originalId, registerId); + } + + private void createNormalUserTask(Process process, FlowElementDTO nodeDto) throws JsonProcessingException { + UserTask userTask = new UserTask(); userTask.setId(nodeDto.getId()); userTask.setName(nodeDto.getName()); - log.info("创建用户任务节点 nodeId:{}", nodeDto.getId()); + userTask.setAsynchronous(true); + disableAsyncRetry(userTask); + addUserTaskExtensions(userTask, nodeDto); + process.addFlowElement(userTask); + } + + + // 抽取通用的 UserTask 扩展属性设置 + private void addUserTaskExtensions(UserTask userTask, FlowElementDTO nodeDto) throws JsonProcessingException { + // 绑定控制参数(和 ServiceTask 类似) if (nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null) { BaseExecuteConfig userTaskExecuteConfig = nodeDto.getExtensionElements().getExecuteConfig(); - // 设置异步回调节点ID - userTaskExecuteConfig.setCallbackNodeId(asyncTaskMap.getOrDefault(nodeDto.getId(), null)); String configJson = objectMapper.writeValueAsString(userTaskExecuteConfig); log.info("用户任务userTask的executeConfig配置:{}", configJson); @@ -4922,14 +5864,29 @@ public class Dto2BpmnConverter { // 设置用户任务的属性,使其可以被任何人处理 // 不设置 assignee 或 candidateUsers,这样任何人都可以处理任务 - // 可选:绑定 TaskListener,在任务完成时触发逻辑 + // 【修改逻辑】仅当不是本地应用节点时,才在 UserTask 上添加目录准备监听器 + // 因为本地应用节点已经前移到 Register 节点处理了 + if (!FlowNodeIdUtils.isLocalAppNode(nodeDto)) { + addDirectoryPreparationListener(userTask); + } + } + + + /** + * 为节点添加目录准备监听器 + * @param flowElement 需要添加监听器的节点 + */ + private void addDirectoryPreparationListener(FlowElement flowElement) { FlowableListener dirPrepareListener = new FlowableListener(); dirPrepareListener.setEvent("start"); dirPrepareListener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); dirPrepareListener.setImplementation("${userTaskDirectoryPreparationListener}"); - userTask.getExecutionListeners().add(dirPrepareListener); - process.addFlowElement(userTask); + if (flowElement instanceof UserTask) { + ((UserTask) flowElement).getExecutionListeners().add(dirPrepareListener); + } else if (flowElement instanceof ServiceTask) { + ((ServiceTask) flowElement).getExecutionListeners().add(dirPrepareListener); + } } private void createServiceTask(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) throws JsonProcessingException { @@ -5007,6 +5964,7 @@ public class Dto2BpmnConverter { package com.sdm.flowable.util; import com.sdm.common.config.FlowableConfig; +import com.sdm.common.entity.flowable.dto.FlowElementDTO; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -5014,6 +5972,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import static com.sdm.flowable.enums.ExecuteTypeEnum.LOCAL_APP; + @Slf4j public class FlowNodeIdUtils { private static final String JOIN_GATEWAY_PREFIX = FlowableConfig.JOIN_GATEWAY_PREFIX; @@ -5021,6 +5981,7 @@ public class FlowNodeIdUtils { private static final String ASYNC_TASK_SUFFIX = FlowableConfig.ASYNC_TASK_SUFFIX; private static final String WAIT_USER_SUFFIX = FlowableConfig.WAIT_USER_SUFFIX; private static final String CHECK_SUFFIX = FlowableConfig.CHECK_SUFFIX; // 后置哨兵 + private static final String REGISTER_SUFFIX = FlowableConfig.REGISTER_SUFFIX; // ==================== 网关 ==================== @@ -5089,6 +6050,22 @@ public class FlowNodeIdUtils { return checkTaskId.substring(0, checkTaskId.length() - CHECK_SUFFIX.length()); } + // ==================== 本地应用注册节点 ==================== + public static String generateRegisterTaskId(String nodeId) { + return nodeId + REGISTER_SUFFIX; + } + + public static boolean isRegisterTask(String id) { + return id != null && id.endsWith(REGISTER_SUFFIX); + } + + public static String getOriginalNodeIdFromRegisterTask(String registerTaskId) { + if (!isRegisterTask(registerTaskId)) { + throw new IllegalArgumentException("不是注册节点: " + registerTaskId); + } + return registerTaskId.substring(0, registerTaskId.length() - REGISTER_SUFFIX.length()); + } + // --- 解析器 (反向查找原始ID) --- /** @@ -5105,6 +6082,9 @@ public class FlowNodeIdUtils { if (nodeId.endsWith(CHECK_SUFFIX)) { return nodeId.substring(0, nodeId.length() - CHECK_SUFFIX.length()); } + if (nodeId.endsWith(REGISTER_SUFFIX)) { + return nodeId.substring(0, nodeId.length() - REGISTER_SUFFIX.length()); + } return nodeId; } // ==================== 重试任务 ==================== @@ -5125,6 +6105,7 @@ public class FlowNodeIdUtils { String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR; Path localBaseDir = Paths.get(simulationBaseDir).toAbsolutePath().normalize(); Path fullLocalPath = localBaseDir.resolve(objectKey).normalize(); + log.info("开始准备本地目录: {}", fullLocalPath); // 安全校验:防止路径穿越 if (!fullLocalPath.startsWith(localBaseDir)) { @@ -5140,6 +6121,7 @@ public class FlowNodeIdUtils { log.info("创建本地目录: {}", fullLocalPath); Files.createDirectories(fullLocalPath); } catch (Exception e) { + log.error("prepareLocalDir error:{}", e.getMessage()); throw new RuntimeException("无法准备本地目录: " + fullLocalPath, e); } } @@ -5154,12 +6136,63 @@ public class FlowNodeIdUtils { return !nodeId.endsWith(WAIT_USER_SUFFIX) && !nodeId.endsWith(ASYNC_TASK_SUFFIX) && !nodeId.endsWith(CHECK_SUFFIX) && + !nodeId.endsWith(REGISTER_SUFFIX)&& !nodeId.startsWith(JOIN_GATEWAY_PREFIX) && // 过滤网关 !nodeId.startsWith(SPLIT_GATEWAY_PREFIX); } + + public static boolean isLocalAppNode(FlowElementDTO nodeDto) { + if (nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null) { + return LOCAL_APP.getCode().equalsIgnoreCase(nodeDto.getExtensionElements().getExecuteConfig().getExecuteType()); + } + return false; + } } + +server: + port: 7106 +spring: + application: + name: flowable + datasource: + url: jdbc:mysql://192.168.190.100:3306/flowable?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai + username: root + password: mysql + driver-class-name: com.mysql.cj.jdbc.Driver + flowable: + # ????????? + database-schema-update: true + # ??????JOB + async-executor-activate: true + cloud: + nacos: + discovery: + server-addr: 192.168.190.100:5848 + group: DEV_GROUP + enabled: true + +logging: + level: + org: + flowable: INFO + +mybatis-plus: + mapper-locations: classpath*:/mapper/**/*.xml + type-aliases-package: com.sdm.flowable.model.entity + configuration: + map-underscore-to-camel-case: true + global-config: + db-config: + id-type: auto +security: + whitelist: + paths: + - /process/testHpc + - /process/asyncCallback + + server: port: 7106 @@ -5201,6 +6234,8 @@ security: paths: - /process/testHpc - /process/asyncCallback +hpc: + mockCommand: '\\HPC-COMPUTE-01\share\RLithium\RLithium\reta.exe -i %s' @@ -5299,7 +6334,7 @@ spring: datasource: url: jdbc:mysql://192.168.30.146:3306/flowable?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai username: root - password: mysql + password: ENC(o5nKvbyfceJryxfBBGTi9w==) driver-class-name: com.mysql.cj.jdbc.Driver flowable: # ????????? @@ -5334,14 +6369,14 @@ security: - /process/asyncCallback - + server: port: 7106 spring: application: name: flowable datasource: - url: jdbc:mysql://192.168.30.146:3306/flowable?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai + url: jdbc:mysql://127.0.0.1:3306/flowable?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai username: root password: mysql driver-class-name: com.mysql.cj.jdbc.Driver @@ -5353,9 +6388,11 @@ spring: cloud: nacos: discovery: - server-addr: 192.168.30.146:8848 - group: PROD_GROUP + server-addr: 127.0.0.1:8848 + group: YANG_GROUP enabled: true + username: nacos + password: nacos logging: level: @@ -5370,10 +6407,12 @@ mybatis-plus: global-config: db-config: id-type: auto + security: whitelist: paths: - /process/testHpc + - /process/testHpc2 - /process/asyncCallback diff --git a/flowable/src/main/java/com/sdm/flowable/enums/OperationTypeEnum.java b/flowable/src/main/java/com/sdm/flowable/enums/OperationTypeEnum.java index 2d6d9d78..800b90d4 100644 --- a/flowable/src/main/java/com/sdm/flowable/enums/OperationTypeEnum.java +++ b/flowable/src/main/java/com/sdm/flowable/enums/OperationTypeEnum.java @@ -26,9 +26,9 @@ public enum OperationTypeEnum { TERMINATE(ProcessInstanceStateEnum.RUNNING, ProcessInstanceStateEnum.SUSPENDED, ProcessInstanceStateEnum.ERROR), /** - * 重试/跳转:必须是 FAILED 状态(有死信) + * 重试/跳转:必须是 FAILED 状态(有死信),RUNNING */ - RETRY(ProcessInstanceStateEnum.ERROR), + RETRY(ProcessInstanceStateEnum.ERROR,ProcessInstanceStateEnum.RUNNING), /** * 启动:通常对应无实例状态,这里作为占位 diff --git a/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java b/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java index feed50ab..32396083 100644 --- a/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java +++ b/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java @@ -94,7 +94,7 @@ public class ProcessService implements Iprocess{ // 部署流程(前端传入Flowable标准JSON) - public SdmResponse deploy(ProcessDefinitionDTO processDTO) throws Exception { + public SdmResponse deploy(ProcessDefinitionDTO processDTO) throws Exception { log.info("开始部署流程定义: {}",processDTO); BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO); log.info("BPMN模型转换完成"); @@ -558,11 +558,20 @@ public class ProcessService implements Iprocess{ } // 1. 判断 ERROR (Check 节点死信) - if (ctx.errorMap.containsKey(checkId)) { + boolean hasError = ctx.errorMap.containsKey(registerId) + || ctx.errorMap.containsKey(waitId) + || ctx.errorMap.containsKey(checkId) + ||ctx.errorMap.containsKey(origId); + if (hasError) { + // 从 registerId、waitId、checkId、origId 中获取第一个非空的错误信息 + String errorMessage = ctx.errorMap.get(checkId); + if (errorMessage == null) errorMessage =ctx.errorMap.get(registerId); + if (errorMessage == null) errorMessage = ctx.errorMap.get(waitId); + if (errorMessage == null) errorMessage = ctx.errorMap.get(origId); node.setStatus(NodeStateEnum.ERROR.getCode()); - node.setErrorMessage(ctx.errorMap.get(checkId)); + node.setErrorMessage(errorMessage); // 计算时间:从 register 开始,到当前时刻 - calculateLocalAppTime(node, ctx, registerId, origId); + calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId); return; } @@ -570,7 +579,7 @@ public class ProcessService implements Iprocess{ // 这代表前置的自动化流程跑完了,且成功了,正在等用户点下一步 if (ctx.activeActivityIds.contains(origId)) { node.setStatus(NodeStateEnum.WAITING_FOR_USER.getCode()); - calculateLocalAppTime(node, ctx, registerId, origId); + calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId); return; } @@ -583,15 +592,18 @@ public class ProcessService implements Iprocess{ if (isActive) { // 如果流程挂起,优先显示挂起 node.setStatus(ctx.isSuspended() ? NodeStateEnum.SUSPENDED.getCode() : NodeStateEnum.ACTIVE.getCode()); - calculateLocalAppTime(node, ctx, registerId, origId); + calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId); return; } // 4. 判断 FINISHED (Original 节点已结束) - HistoricActivityInstance lastHist = ctx.historyMap.get(origId); - if (lastHist != null && lastHist.getEndTime() != null) { + // 正常情况下:以 origId 的历史 endTime 为准。 + // 兜底情况下:如果由于“强制跳转”绕过了 origId 的创建/执行,historyMap 可能不存在 origId。 + // 此时只要链路(_register/_wait/_check)中任意节点产生过结束记录,并且当前不再处于链路活动中, + // 就将 origId 视为已完成,避免界面长期停留在 ERROR/PENDING。 + if (isLocalAppOrigFinished(ctx, origId, registerId, waitId, checkId)) { node.setStatus(NodeStateEnum.FINISHED.getCode()); - calculateLocalAppTime(node, ctx, registerId, origId); + calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId); return; } @@ -599,6 +611,54 @@ public class ProcessService implements Iprocess{ node.setStatus(NodeStateEnum.PENDING.getCode()); } + /** + * 判断 Local App 聚合节点(Original/UserTask)是否应当被标记为 FINISHED。 + *

+ * 正常判定:依赖 origId 的历史 endTime。 + *

+ * 跳转兜底:当强制跳转绕过 origId 的实际创建/执行时,可能缺失 origId 的历史记录; + * 这时若链路上(_register/_wait/_check)任意节点已出现结束时间记录,且当前没有任何链路节点仍在活动, + * 则认为 origId 的聚合状态应为 FINISHED。 + * + * @param ctx 状态上下文 + * @param origId Original 逻辑节点ID + * @param registerId _register 节点ID + * @param waitId _wait 节点ID + * @param checkId _check 节点ID + * @return true 表示应标记为 FINISHED + */ + private boolean isLocalAppOrigFinished(ProcessStateContext ctx, String origId, String registerId, String waitId, String checkId) { + HistoricActivityInstance lastHist = ctx.historyMap.get(origId); + if (lastHist != null && lastHist.getEndTime() != null) { + return true; + } + + // 跳转兜底:链路没有活动 token,且历史上至少有一个链路节点出现过结束时间 + boolean chainHasActiveToken = ctx.activeActivityIds.contains(registerId) + || ctx.activeActivityIds.contains(waitId) + || ctx.activeActivityIds.contains(checkId) + || ctx.activeActivityIds.contains(origId); + if (chainHasActiveToken) { + return false; + } + + Date registerEnd = getEndTime(ctx.historyMap.get(registerId)); + Date waitEnd = getEndTime(ctx.historyMap.get(waitId)); + Date checkEnd = getEndTime(ctx.historyMap.get(checkId)); + + return registerEnd != null || waitEnd != null || checkEnd != null; + } + + /** + * 安全获取 HistoricActivityInstance 的 endTime。 + * + * @param hist 历史活动实例 + * @return endTime;如果 hist 为空或 endTime 为空则返回 null + */ + private Date getEndTime(HistoricActivityInstance hist) { + return hist != null ? hist.getEndTime() : null; + } + private void determineServiceTaskUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { String origId = node.getId(); String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(origId); @@ -606,15 +666,22 @@ public class ProcessService implements Iprocess{ String checkId = FlowNodeIdUtils.generateCheckTaskId(origId); // 1. 判断 ERROR (优先级最高) - // 链条任何一环报错,原始节点都算错 - String errorMsg = ctx.errorMap.get(checkId); // 最常见:check挂了 - if (errorMsg == null) errorMsg = ctx.errorMap.get(origId); // 自己挂了 - // 也可以加上 waitId 的判断,虽然 ReceiveTask 很难挂 + // 1. 判断 ERROR (Check 节点死信) + boolean hasError = ctx.errorMap.containsKey(waitUserId) + ||ctx.errorMap.containsKey(origId) + || ctx.errorMap.containsKey(waitId) + || ctx.errorMap.containsKey(checkId); + + if (hasError) { + // 从 registerId、waitId、checkId、origId 中获取第一个非空的错误信息 + String errorMessage = ctx.errorMap.get(waitUserId); + if (errorMessage == null) errorMessage =ctx.errorMap.get(origId); + if (errorMessage == null) errorMessage = ctx.errorMap.get(waitId); + if (errorMessage == null) errorMessage = ctx.errorMap.get(checkId); - if (errorMsg != null) { node.setStatus(NodeStateEnum.ERROR.getCode()); - node.setErrorMessage(errorMsg); - calculateAggregatedTime(node, ctx, waitUserId, origId, checkId); + node.setErrorMessage(errorMessage); + calculateAggregatedTime(node, ctx, waitUserId, origId,waitId, checkId); return; } @@ -640,21 +707,17 @@ public class ProcessService implements Iprocess{ // 停留在 origId 或 waitId 或 checkId-> 视为通用执行中 node.setStatus(NodeStateEnum.ACTIVE.getCode()); } - calculateAggregatedTime(node, ctx, waitUserId, origId, checkId); + calculateAggregatedTime(node, ctx, waitUserId, origId,waitId, checkId); return; } // 3. 判断 FINISHED - // 必须是链条的"最后一个环节"结束了,才算整个节点结束 - // 顺序:WaitUser -> Original -> Wait -> Check - // 我们检查 Check 是否有历史;如果没有 Check (非HPC节点),检查 Original - HistoricActivityInstance lastHist = ctx.historyMap.get(checkId); - if (lastHist == null) lastHist = ctx.historyMap.get(waitId); // 兼容 - if (lastHist == null) lastHist = ctx.historyMap.get(origId); - - if (lastHist != null && lastHist.getEndTime() != null) { + // 兜底:当发生“强制跳转”绕过链路最后环节时,可能缺失 check/orig 的历史记录。 + // 只要链路(waitUser/orig/wait/check)中任意节点产生过结束记录,并且当前不再处于链路活动中, + // 就将 origId 视为已完成,避免界面长期停留在 PENDING。 + if (isServiceTaskOrigFinished(ctx, waitUserId, origId, waitId, checkId)) { node.setStatus(NodeStateEnum.FINISHED.getCode()); - calculateAggregatedTime(node, ctx, waitUserId, origId, checkId); + calculateAggregatedTime(node, ctx, waitUserId, origId, waitId,checkId); return; } @@ -662,6 +725,39 @@ public class ProcessService implements Iprocess{ node.setStatus(NodeStateEnum.PENDING.getCode()); } + /** + * 判断 ServiceTask 聚合节点(Original)是否应当被标记为 FINISHED。 + *

+ * 正常判定:依赖链路最后环节(通常是 _check)的历史 endTime。 + *

+ * 跳转兜底:当强制跳转绕过了链路最后环节或 origId 的实际创建/执行时,可能缺失关键节点历史记录; + * 这时若链路上(waitUser/orig/wait/check)任意节点已出现结束时间记录,且当前没有任何链路节点仍在活动, + * 则认为 origId 的聚合状态应为 FINISHED。 + * + * @param ctx 状态上下文 + * @param waitUserId _waitUser 节点ID + * @param origId Original 逻辑节点ID + * @param waitId _wait 节点ID + * @param checkId _check 节点ID + * @return true 表示应标记为 FINISHED + */ + private boolean isServiceTaskOrigFinished(ProcessStateContext ctx, String waitUserId, String origId, String waitId, String checkId) { + boolean chainHasActiveToken = ctx.activeActivityIds.contains(waitUserId) + || ctx.activeActivityIds.contains(origId) + || ctx.activeActivityIds.contains(waitId) + || ctx.activeActivityIds.contains(checkId); + if (chainHasActiveToken) { + return false; + } + + Date waitUserEnd = getEndTime(ctx.historyMap.get(waitUserId)); + Date origEnd = getEndTime(ctx.historyMap.get(origId)); + Date waitEnd = getEndTime(ctx.historyMap.get(waitId)); + Date checkEnd = getEndTime(ctx.historyMap.get(checkId)); + + return waitUserEnd != null || origEnd != null || waitEnd != null || checkEnd != null; + } + // 判断是否为本地应用节点 private boolean isLocalApp(NodeDetailInfo node) { @@ -718,7 +814,7 @@ public class ProcessService implements Iprocess{ * 工具:计算聚合时间 (Original Start -> Check End) */ private void calculateAggregatedTime(NodeDetailInfo node, ProcessStateContext ctx, - String waitUserId, String origId, String checkId) { + String waitUserId, String origId, String waitId,String checkId) { // Start Time: 链条最早的开始时间 HistoricActivityInstance startNode = ctx.historyMap.get(waitUserId); if (startNode == null) startNode = ctx.historyMap.get(origId); @@ -726,12 +822,20 @@ public class ProcessService implements Iprocess{ Date startTime = (startNode != null) ? startNode.getStartTime() : null; node.setStartTime(startTime); - // End Time: 只有状态是 FINISHED 才有结束时间,取 Check 的结束时间 + // End Time: 只有状态是 FINISHED才有结束时间,取四个节点(waitUser, orig, wait, check)的结束时间的最大值 Date endTime = null; if (NodeStateEnum.FINISHED.getCode().equals(node.getStatus())) { - HistoricActivityInstance endNode = ctx.historyMap.get(checkId); - if (endNode == null) endNode = ctx.historyMap.get(origId); - if (endNode != null) endTime = endNode.getEndTime(); + List endTimes = Arrays.asList( + getEndTime(ctx.historyMap.get(waitUserId)), + getEndTime(ctx.historyMap.get(origId)), + getEndTime(ctx.historyMap.get(waitId)), + getEndTime(ctx.historyMap.get(checkId)) + ); + + endTime = endTimes.stream() + .filter(Objects::nonNull) + .max(Date::compareTo) + .orElse(null); } node.setEndTime(endTime); @@ -751,7 +855,7 @@ public class ProcessService implements Iprocess{ * @param registerId 链条起点的ID * @param origId 链条终点的ID */ - private void calculateLocalAppTime(NodeDetailInfo node, ProcessStateContext ctx, String registerId, String origId) { + private void calculateLocalAppTime(NodeDetailInfo node, ProcessStateContext ctx, String registerId,String waitId,String checkId, String origId) { // 1. 获取开始时间 (取 _register 的历史记录) HistoricActivityInstance startInst = ctx.historyMap.get(registerId); @@ -763,14 +867,20 @@ public class ProcessService implements Iprocess{ Date startTime = (startInst != null) ? startInst.getStartTime() : null; node.setStartTime(startTime); - // 2. 获取结束时间 (仅当整个节点 FINISHED 时才有值) + // 2. 只有状态是 FINISHED才有结束时间,取四个节点(waitUser, orig, wait, check)的结束时间的最大值 Date endTime = null; if (NodeStateEnum.FINISHED.getCode().equals(node.getStatus())) { - // 取终点 Original (UserTask) 的历史记录 - HistoricActivityInstance endInst = ctx.historyMap.get(origId); - if (endInst != null) { - endTime = endInst.getEndTime(); - } + List endTimes = Arrays.asList( + getEndTime(ctx.historyMap.get(registerId)), + getEndTime(ctx.historyMap.get(waitId)), + getEndTime(ctx.historyMap.get(checkId)), + getEndTime(ctx.historyMap.get(origId)) + ); + + endTime = endTimes.stream() + .filter(Objects::nonNull) + .max(Date::compareTo) + .orElse(null); } node.setEndTime(endTime); @@ -1188,6 +1298,9 @@ public class ProcessService implements Iprocess{ // 推测是否存在对应的注册节点 (Local App 特征) String registerNodeId = FlowNodeIdUtils.generateRegisterTaskId(originalNodeId); + // 推测是否有对应的_waitUser 节点 (HPC节点的特征) + String waitUserNodeId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId); + // 如果注册节点存在,说明这是 Local App 链路,且必须从注册节点重新开始 // 避免直接跳到中间状态 (如 check 或 wait),导致本地应用没拉起来 if (bpmnModel.getMainProcess().getFlowElement(registerNodeId) != null) { @@ -1196,6 +1309,14 @@ public class ProcessService implements Iprocess{ targetNodeId = registerNodeId; } } + + // 如果存在 _waitUser 节点,说明这是 HPC 节点,且必须从 _waitUser 节点重新开始 + if (bpmnModel.getMainProcess().getFlowElement(waitUserNodeId) != null) { + if (!waitUserNodeId.equals(targetNodeId)) { + log.info("检测到 HPC 链路,自动修正重试目标: {} -> {}", targetNodeId, waitUserNodeId); + targetNodeId = waitUserNodeId; + } + } } // 1. 获取当前流程实例中 **所有** 活跃/停滞的节点 diff --git a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java index a0b0a3ce..4af5a723 100644 --- a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java +++ b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java @@ -181,6 +181,32 @@ public class Dto2BpmnConverter { private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map waitUserTaskMap) { // 用来设置节点是否人工执行: 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask + // 而且必须出入,这是节点开启异步带来的问题: + /** + * 事务 A:抵达节点(成功提交) + * 上一步的 /asyncCallback 调用了 runtimeService.trigger()。 + * 引擎让令牌离开 _wait,走向 _check。 + * 因为 _check 是异步节点,引擎不会立刻执行它,而是做两件事: + * 将 ACT_RU_EXECUTION 表的指针指向 _check。 + * 在 ACT_RU_JOB 表中插入一条待执行的任务记录。 + * 事务 A 提交(Commit)。 + * 注意:此时,Flowable 的历史机制认为“节点还没真正开始执行 Java 代码”,所以此时 ACT_HI_ACTINST 中还没有 _check 的记录。 + * + * Flowable 的后台异步线程(AsyncExecutor)拿到了这个 Job,开启全新的事务 B。 + * 线程开始真正进入 _check 节点,第一步:往 ACT_HI_ACTINST 插入一条开始记录。 + * 第二步:执行您绑定的 Java 代码 AsyncResultCheckDelegate。 + * 第三步:因为回调状态是 FAIL,您的 Java 代码中抛出了 RuntimeException("异步任务执行失败...")。 + * 致命结果:因为抛出了未捕获的运行时异常,整个事务 B 被数据库强行回滚(Rollback)! + * 👉 刚才第一步写入 ACT_HI_ACTINST 的历史记录,随着事务回滚,消失了 + * + * 引擎捕获到了事务 B 的回滚,开启事务 C 进行善后。 + * 检查您的重试策略是 R0(0次重试),于是直接将该任务从 ACT_RU_JOB 移入 ACT_RU_DEADLETTER_JOB(死信表)。 + * 将 ACT_RU_EXECUTION 中的 DEADLETTER_JOB_COUNT_ 更新为 1(就是您截图里看到的那样)。 + * 事务 C 提交。 + * + * 解决 强制跳过后 ACT_HI_ACTINST 丢失:就是在前面插入一个UserTask,强制跳转的时候都会先跳到这个UserTask,保证历史表里有这个聚合记录,防止SERVICETASK挂了没有信息可以查询执行过 + */ + if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType())) { String originalNodeId = nodeDto.getId(); String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);