diff --git a/common/src/main/java/com/sdm/common/entity/enums/DirTypeEnum.java b/common/src/main/java/com/sdm/common/entity/enums/DirTypeEnum.java index 6a262d7d..bdc67981 100644 --- a/common/src/main/java/com/sdm/common/entity/enums/DirTypeEnum.java +++ b/common/src/main/java/com/sdm/common/entity/enums/DirTypeEnum.java @@ -2,6 +2,8 @@ package com.sdm.common.entity.enums; import io.swagger.v3.oas.annotations.media.Schema; +import java.util.List; + @Schema(description = "文件夹类型枚举") public enum DirTypeEnum { /** @@ -64,4 +66,12 @@ public enum DirTypeEnum { } return null; } + + // 初始化用户业务库目录 + private static final List INIT_SPMD_DIR = List.of( + DirTypeEnum.KNOWLEDGE_BASE_DIR, DirTypeEnum.PROJECT_NODE_DIR, + DirTypeEnum.AVATAR_DIR, DirTypeEnum.SIMULATION_PARAMETER_DIR, DirTypeEnum.TRAIN_MODEL_DIR,DirTypeEnum.SCRIPT_DIR); + public static final List getInitSpmdDir() { + return INIT_SPMD_DIR; + } } diff --git a/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java b/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java index ed87b554..a9e2baf4 100644 --- a/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java +++ b/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java @@ -15,4 +15,5 @@ public class NodeDetailInfo extends NodeStructureInfo{ private Date endTime; private Long durationInMillis; private String durationFormatted; + private String errorMessage; } \ No newline at end of file diff --git a/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/ExportWordScriptExecuteConfig.java b/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/ExportWordScriptExecuteConfig.java index 0ea6bd72..6eebe030 100644 --- a/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/ExportWordScriptExecuteConfig.java +++ b/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/ExportWordScriptExecuteConfig.java @@ -7,8 +7,8 @@ public class ExportWordScriptExecuteConfig extends BaseExecuteConfig { // 输入节点id private String beforeNodeId; // 文件正则表达式,用于匹配输入文件夹下的文件名 - private String fileRegularStr="^aa\\.xml$"; + private String fileRegularStr="^.+\\.(jpg|jpeg|png|gif|bmp|webp|svg|tiff|tif)$"; // 导出脚本文件id - private Long exportScriptFileId; + private Long exportScriptFileId = 4462L; } diff --git a/data/src/main/java/com/sdm/data/service/impl/FileUserPermissionServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/FileUserPermissionServiceImpl.java index c1abdf10..96ee7a9b 100644 --- a/data/src/main/java/com/sdm/data/service/impl/FileUserPermissionServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/FileUserPermissionServiceImpl.java @@ -1,6 +1,7 @@ package com.sdm.data.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.sdm.common.entity.enums.DirTypeEnum; import com.sdm.common.entity.enums.FilePermissionEnum; import com.sdm.data.dao.FileUserPermissionMapper; import com.sdm.data.model.entity.FileMetadataInfo; @@ -39,7 +40,14 @@ public class FileUserPermissionServiceImpl extends ServiceImpl INIT_SPMD_DIR = List.of( - DirTypeEnum.KNOWLEDGE_BASE_DIR, DirTypeEnum.PROJECT_NODE_DIR, - DirTypeEnum.AVATAR_DIR, DirTypeEnum.SIMULATION_PARAMETER_DIR, DirTypeEnum.TRAIN_MODEL_DIR,DirTypeEnum.SCRIPT_DIR); - // fileData 知识库文件列表可见的数据 private final List fileDatdList = Arrays.asList( NumberConstants.ZERO, @@ -1148,7 +1143,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { @Override @Transactional(rollbackFor = Exception.class) public boolean initSystemDirectory(String company) { - for (DirTypeEnum dirType : INIT_SPMD_DIR) { + for (DirTypeEnum dirType : DirTypeEnum.getInitSpmdDir()) { String dirMinioObjectKey = getDirMinioObjectKey(dirType.getDirName()); try { // 检查目录是否已存在 diff --git a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java index 41e98e69..565001ef 100644 --- a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java +++ b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java @@ -69,7 +69,6 @@ public class ProcessController implements IFlowableFeignClient { @PostMapping("/deploy") public SdmResponse deploy(@RequestBody ProcessDefinitionDTO processDTO) { try { - log.info("开始部署流程定义: {}",processDTO); return processService.deploy(processDTO); } catch (Exception e) { log.error("流程部署失败: ", e); @@ -149,7 +148,6 @@ public class ProcessController implements IFlowableFeignClient { */ @PostMapping("/continueServiceTask") public SdmResponse continueServiceTask(@RequestBody CompleteTaskReq req) { - log.info("开始继续服务任务处理, 请求参数: {}", req); return processService.continueServiceTask(req); } diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java b/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java index e59a9f5c..d035cadb 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java @@ -109,7 +109,8 @@ public class UniversalDelegate implements JavaDelegate { } catch (Exception e) { // 处理失败情况 - 跳转到重试任务 log.error("节点执行过程中发生异常", e); - handleFailure(execution, e); + throw new RuntimeException("节点执行过程中发生异常:" + e.getMessage()); + // handleFailure(execution, e); } } diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/ExportWordScriptHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/ExportWordScriptHandler.java index 401c82ab..8a59225f 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/ExportWordScriptHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/ExportWordScriptHandler.java @@ -7,13 +7,10 @@ import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.data.UploadFilesReq; import com.sdm.common.entity.resp.data.FileMetadataInfoResp; import com.sdm.common.feign.inter.data.IDataFeignClient; -import com.sdm.common.utils.RandomUtil; -import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig; import com.sdm.flowable.constants.FlowableConfig; import com.sdm.flowable.entity.ProcessNodeParam; import com.sdm.flowable.service.IProcessNodeParamService; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ObjectUtils; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; @@ -26,6 +23,8 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; + /** * @Description: 生成自动化报告脚本处理器 * @Author: shiman @@ -40,13 +39,6 @@ public class ExportWordScriptHandler implements ExecutionHandler params, ExportWordScriptExecuteConfig config) { try { @@ -95,9 +87,10 @@ public class ExportWordScriptHandler implements ExecutionHandler result = new ArrayList<>(); int runningStatus = -1; @@ -142,7 +135,7 @@ public class ExportWordScriptHandler implements ExecutionHandler deploy(ProcessDefinitionDTO processDTO) throws Exception { - log.info("开始部署流程定义"); + log.info("开始部署流程定义: {}",processDTO); BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO); log.info("BPMN模型转换完成"); @@ -169,36 +172,128 @@ public class ProcessService { .processInstanceId(processInstanceId) .list(); - Map activityMap = historicActivities.stream() - .collect(Collectors.toMap( - HistoricActivityInstance::getActivityId, - Function.identity(), - (a, b) -> a - )); + // 为了处理多实例或循环,最好按结束时间倒序排,取最新的 + Map activityMap = new HashMap<>(); + for (HistoricActivityInstance hist : historicActivities) { + // 如果Map里已经有了,且当前的开始时间比Map里的晚,则覆盖(取最新的) + if (!activityMap.containsKey(hist.getActivityId()) || + hist.getStartTime().after(activityMap.get(hist.getActivityId()).getStartTime())) { + activityMap.put(hist.getActivityId(), hist); + } + } List activeActivityIds = isProcessRunning(processInstanceId) ? runtimeService.getActiveActivityIds(processInstanceId) : Collections.emptyList(); - - // 更新节点状态信息 - for (NodeDetailInfo node : nodes) { - String nodeId = node.getId(); - - HistoricActivityInstance historicActivity = activityMap.get(nodeId); - boolean isActive = activeActivityIds.contains(nodeId); - boolean isFinished = historicActivity != null && historicActivity.getEndTime() != null; - node.setStatus(isFinished ? "finished" : (isActive ? "active" : "pending")); + // 获取异常/失败的任务信息 + // Map + Map errorMap = new HashMap<>(); + if (isProcessRunning(processInstanceId)) { + // 1. 查询死信作业 (DeadLetterJob) - 重试次数用尽,彻底失败 + List deadJobs = managementService.createDeadLetterJobQuery() + .processInstanceId(processInstanceId).list(); - if (historicActivity != null) { - node.setStartTime(historicActivity.getStartTime()); - if (isFinished) { - node.setEndTime(historicActivity.getEndTime()); - node.setDurationInMillis(historicActivity.getDurationInMillis()); - if (historicActivity.getDurationInMillis() != null) { - node.setDurationFormatted(formatDuration(historicActivity.getDurationInMillis())); + // 2. 查询普通作业 (Job) - 包含正在重试中的异常 (exceptionMessage不为空) + List activeJobs = managementService.createJobQuery() + .processInstanceId(processInstanceId).list(); + + // 3. 辅助查询:获取 Execution 以便将 Job 映射到 ActivityId + // Job 对象里通常只有 executionId,我们需要知道这个 execution 当前在哪个 activityId + List executions = runtimeService.createExecutionQuery() + .processInstanceId(processInstanceId).list(); + Map executionToActivityMap = executions.stream() + .collect(Collectors.toMap(Execution::getId, Execution::getActivityId, (v1, v2) -> v1)); + + // 填充错误 Map + Consumer mapJobToError = job -> { + if (job.getExceptionMessage() != null) { + String activityId = executionToActivityMap.get(job.getExecutionId()); + if (activityId != null) { + errorMap.put(activityId, job.getExceptionMessage()); } } + }; + + deadJobs.forEach(mapJobToError); + activeJobs.forEach(mapJobToError); + } + + + // 更新节点状态信息,遍历节点设置状态,并处理 ServiceTask + ReceiveTask 的合并逻辑 + for (NodeDetailInfo node : nodes) { + String originalNodeId = node.getId(); + + // 4.1 获取当前节点的物理状态 + HistoricActivityInstance myHist = activityMap.get(originalNodeId); + boolean isActive = activeActivityIds.contains(originalNodeId); + boolean isFinished = myHist != null && myHist.getEndTime() != null; + // 判断该节点是否有报错 + String errorMessage = errorMap.get(originalNodeId); + boolean isError = errorMessage != null; + + // 初始状态判定 (优先级:Error > Finished > Active > Pending) + String displayStatus; + if (isError) { + displayStatus = "error"; + } else if (isFinished) { + displayStatus = "finished"; + } else if (isActive) { + displayStatus = "active"; + } else { + displayStatus = "pending"; + } + + Date startTime = myHist != null ? myHist.getStartTime() : null; + Date endTime = myHist != null ? myHist.getEndTime() : null; + Long duration = myHist != null ? myHist.getDurationInMillis() : null; + + // 4.2 特殊处理:检查是否为异步回调的 ServiceTask + if (isAsyncCallbackNode(node)) { + // 如果原始节点本身就报错了(ServiceTask 执行 JavaDelegate 时抛出异常) + // 此时还没走到 ReceiveTask,所以直接显示 Error + if (isError) { + node.setErrorMessage(errorMessage); // 假设 NodeDetailInfo 有这个字段 + // 保持 displayStatus = "error" + } else { + // 原有逻辑:推算 ReceiveTask + String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId); + HistoricActivityInstance waitHist = activityMap.get(waitNodeId); + boolean waitIsActive = activeActivityIds.contains(waitNodeId); + boolean waitIsFinished = waitHist != null && waitHist.getEndTime() != null; + + if (waitIsActive) { + displayStatus = "active"; + endTime = null; + if (startTime != null) { + duration = System.currentTimeMillis() - startTime.getTime(); + } + } else if (waitIsFinished) { + displayStatus = "finished"; + endTime = waitHist.getEndTime(); + if (startTime != null && endTime != null) { + duration = endTime.getTime() - startTime.getTime(); + } + } else if (isFinished && !waitIsFinished && !waitIsActive) { + // 极短时间差 + displayStatus = "active"; + endTime = null; + } + } + } else { + // 非特殊节点,如果是 Error 状态,填充错误信息 + if (isError) { + node.setErrorMessage(errorMessage); + } + } + + // 4.3 赋值给 DTO + node.setStatus(displayStatus); + node.setStartTime(startTime); + node.setEndTime(endTime); + node.setDurationInMillis(duration); + if (duration != null) { + node.setDurationFormatted(formatDuration(duration)); } } } @@ -336,7 +431,27 @@ public class ProcessService { return String.format("%ds", seconds); } - + /** + * 辅助方法:解析节点配置判断是否为异步回调节点 + */ + private boolean isAsyncCallbackNode(NodeDetailInfo node) { + if (!FlowElementTypeEnums.SERVICETASK.getType().equalsIgnoreCase(node.getType())) { + return false; + } + String configJson = node.getExecuteConfig(); + if (configJson == null || configJson.isEmpty()) { + return false; + } + try { + // 这里解析 JSON,看 executeConfig.isAsyncCallback() + // 建议使用 FastJson 或 Jackson + JSONObject json = JSONObject.parseObject(configJson); + return json != null && json.getBooleanValue("asyncCallback"); + } catch (Exception e) { + log.warn("解析节点配置失败: {}", node.getId()); + return false; + } + } public SdmResponse continueServiceTask(@RequestBody CompleteTaskReq req) { log.info("开始继续服务任务处理, 请求参数: {}", req); diff --git a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java index e7354100..9f03cf76 100644 --- a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java +++ b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java @@ -41,7 +41,7 @@ public class Dto2BpmnConverter { List allElements = dto.getFlowElements(); List nodeDtos = allElements.stream() .filter(e -> !FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType())) - .collect(Collectors.toList()); + .toList(); List flowDtos = allElements.stream() .filter(e -> FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType())) .collect(Collectors.toList()); @@ -74,9 +74,28 @@ public class Dto2BpmnConverter { // 6. 创建连线 createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap); + validProcess(process); + return bpmnModel; } + /** + * 验证流程元素的唯一性 + */ + private static void validProcess(Process process) { + Map counter = new HashMap<>(); + + for (FlowElement e : process.getFlowElements()) { + counter.put(e.getId(), counter.getOrDefault(e.getId(), 0) + 1); + } + + counter.forEach((id, cnt) -> { + if (cnt > 1) { + log.error("❌ 重复 ID 发现: {} 出现次数: {}", id, cnt); + } + }); + } + /** * 添加通用重试任务节点 */ @@ -111,6 +130,8 @@ public class Dto2BpmnConverter { String originalNodeId = nodeDto.getId(); String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId); ReceiveTask receiveTask = new ReceiveTask(); + receiveTask.setAsynchronous(true); + disableAsyncRetry(receiveTask); receiveTask.setId(waitNodeId); receiveTask.setName(nodeDto.getName() + "等待结果"); process.addFlowElement(receiveTask); @@ -131,6 +152,8 @@ public class Dto2BpmnConverter { String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId); UserTask waitUserTask = new UserTask(); + waitUserTask.setAsynchronous(true); + disableAsyncRetry(waitUserTask); waitUserTask.setId(waitUserId); waitUserTask.setName(nodeDto.getName() + "等待用户提交"); // 不设置assignee,让任何人可以处理 @@ -162,6 +185,7 @@ public class Dto2BpmnConverter { joinGateway.setName("并行汇聚-" + nodeDto.getName()); process.addFlowElement(joinGateway); joinGatewayMap.put(nodeId, joinGatewayId); + log.info("添加并行汇聚网关:节点ID={}, 汇聚网关ID={}", nodeId, joinGatewayId); } // 检查是否需要添加拆分网关(出度>1) @@ -173,6 +197,7 @@ public class Dto2BpmnConverter { splitGateway.setName("并行拆分-" + nodeDto.getName()); process.addFlowElement(splitGateway); splitGatewayMap.put(nodeId, splitGatewayId); + log.info("添加并行拆分网关:节点ID={}, 拆分网关ID={}", nodeId, splitGatewayId); } } @@ -368,6 +393,8 @@ public class Dto2BpmnConverter { case STARTEVENT: // 开始事件:直接映射 StartEvent startEvent = new StartEvent(); + // 【关键修改】设置异步:确保 startProcessInstanceById 接口立刻返回 ID + startEvent.setAsynchronous(true); startEvent.setId(nodeDto.getId()); startEvent.setName(nodeDto.getName()); process.addFlowElement(startEvent); @@ -381,10 +408,15 @@ public class Dto2BpmnConverter { endEvent.setName(nodeDto.getName()); process.addFlowElement(endEvent); log.info("创建结束事件节点 nodeId:{}", nodeDto.getId()); + break; case USERTASK: // 用户任务:映射为 Flowable UserTask UserTask userTask = new UserTask(); + // 【关键修改】设置异步:防止下方绑定的监听器(创建文件夹)报错导致前一个节点回滚 + // 这会创建一个 Job 来执行 UserTask 的初始化逻辑(包括执行监听器) + userTask.setAsynchronous(true); + disableAsyncRetry(userTask); userTask.setId(nodeDto.getId()); userTask.setName(nodeDto.getName()); log.info("创建用户任务节点 nodeId:{}", nodeDto.getId()); @@ -419,12 +451,15 @@ public class Dto2BpmnConverter { case SERVICETASK: // 服务任务:映射为 Flowable ServiceTask,绑定自定义执行器 ServiceTask serviceTask = new ServiceTask(); + // 【关键修改】设置异步:实现业务逻辑故障隔离,避免阻塞和连环回滚 + serviceTask.setAsynchronous(true); + disableAsyncRetry(serviceTask); serviceTask.setId(nodeDto.getId()); serviceTask.setName(nodeDto.getName()); // 绑定执行器(Bean名称:customTaskExecutor) serviceTask.setImplementation("${universalDelegate}"); serviceTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); - + log.info("创建服务任务节点 nodeId:{}", nodeDto.getId()); if (nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null) { // 添加 Flowable 扩展属性 BaseExecuteConfig serviceTaskExecuteConfig = nodeDto.getExtensionElements().getExecuteConfig(); @@ -471,4 +506,20 @@ public class Dto2BpmnConverter { } return flow; } + + /** + * 配置节点失败后不重试(R0/PT0S) + * @param flowElement 需要配置的节点 + */ + private void disableAsyncRetry(FlowElement flowElement) { + ExtensionElement retryElement = new ExtensionElement(); + // 必须加上 flowable 的命名空间前缀 + retryElement.setNamespace("http://flowable.org/bpmn"); + retryElement.setNamespacePrefix("flowable"); + retryElement.setName("failedJobRetryTimeCycle"); + // R0 代表重试 0 次,即失败立刻停止 + retryElement.setElementText("R0/PT0S"); + + flowElement.addExtensionElement(retryElement); + } } \ No newline at end of file