流程节点异步执行

This commit is contained in:
2025-12-09 17:21:46 +08:00
parent 827d9e2400
commit 82b856c373
10 changed files with 309 additions and 59 deletions

View File

@@ -2,6 +2,8 @@ package com.sdm.common.entity.enums;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
@Schema(description = "文件夹类型枚举")
public enum DirTypeEnum {
/**
@@ -64,4 +66,12 @@ public enum DirTypeEnum {
}
return null;
}
// 初始化用户业务库目录
private static final List<DirTypeEnum> INIT_SPMD_DIR = List.of(
DirTypeEnum.KNOWLEDGE_BASE_DIR, DirTypeEnum.PROJECT_NODE_DIR,
DirTypeEnum.AVATAR_DIR, DirTypeEnum.SIMULATION_PARAMETER_DIR, DirTypeEnum.TRAIN_MODEL_DIR,DirTypeEnum.SCRIPT_DIR);
public static final List<DirTypeEnum> getInitSpmdDir() {
return INIT_SPMD_DIR;
}
}

View File

@@ -15,4 +15,5 @@ public class NodeDetailInfo extends NodeStructureInfo{
private Date endTime;
private Long durationInMillis;
private String durationFormatted;
private String errorMessage;
}

View File

@@ -7,8 +7,8 @@ public class ExportWordScriptExecuteConfig extends BaseExecuteConfig {
// 输入节点id
private String beforeNodeId;
// 文件正则表达式,用于匹配输入文件夹下的文件名
private String fileRegularStr="^aa\\.xml$";
private String fileRegularStr="^.+\\.(jpg|jpeg|png|gif|bmp|webp|svg|tiff|tif)$";
// 导出脚本文件id
private Long exportScriptFileId;
private Long exportScriptFileId = 4462L;
}

View File

@@ -1,6 +1,7 @@
package com.sdm.data.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.sdm.common.entity.enums.DirTypeEnum;
import com.sdm.common.entity.enums.FilePermissionEnum;
import com.sdm.data.dao.FileUserPermissionMapper;
import com.sdm.data.model.entity.FileMetadataInfo;
@@ -39,7 +40,14 @@ public class FileUserPermissionServiceImpl extends ServiceImpl<FileUserPermissio
return false;
}
// 查询当前文件的权限
for (DirTypeEnum dirType : DirTypeEnum.getInitSpmdDir()) {
if (fileMetadataInfo.getOriginalName().equals(dirType.getDirName())) {
// 基础文件夹 忽略权限控制
return true;
}
}
// 查询当前文件的权限
FileUserPermission fileUserPermission = this.lambdaQuery()
.eq(FileUserPermission::getTFilemetaId, fileId)
.eq(FileUserPermission::getUserId, userId)

View File

@@ -86,11 +86,6 @@ import java.util.stream.Collectors;
@Service
@ConditionalOnProperty(name = "fileSystem.chose", havingValue = "minio")
public class MinioFileIDataFileServiceImpl implements IDataFileService {
// 初始化用户业务库目录
private static final List<DirTypeEnum> INIT_SPMD_DIR = List.of(
DirTypeEnum.KNOWLEDGE_BASE_DIR, DirTypeEnum.PROJECT_NODE_DIR,
DirTypeEnum.AVATAR_DIR, DirTypeEnum.SIMULATION_PARAMETER_DIR, DirTypeEnum.TRAIN_MODEL_DIR,DirTypeEnum.SCRIPT_DIR);
// fileData 知识库文件列表可见的数据
private final List<Integer> fileDatdList = Arrays.asList(
NumberConstants.ZERO,
@@ -1148,7 +1143,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
@Override
@Transactional(rollbackFor = Exception.class)
public boolean initSystemDirectory(String company) {
for (DirTypeEnum dirType : INIT_SPMD_DIR) {
for (DirTypeEnum dirType : DirTypeEnum.getInitSpmdDir()) {
String dirMinioObjectKey = getDirMinioObjectKey(dirType.getDirName());
try {
// 检查目录是否已存在

View File

@@ -69,7 +69,6 @@ public class ProcessController implements IFlowableFeignClient {
@PostMapping("/deploy")
public SdmResponse<DeployFlowableResp> deploy(@RequestBody ProcessDefinitionDTO processDTO) {
try {
log.info("开始部署流程定义: {}",processDTO);
return processService.deploy(processDTO);
} catch (Exception e) {
log.error("流程部署失败: ", e);
@@ -149,7 +148,6 @@ public class ProcessController implements IFlowableFeignClient {
*/
@PostMapping("/continueServiceTask")
public SdmResponse continueServiceTask(@RequestBody CompleteTaskReq req) {
log.info("开始继续服务任务处理, 请求参数: {}", req);
return processService.continueServiceTask(req);
}

View File

@@ -109,7 +109,8 @@ public class UniversalDelegate implements JavaDelegate {
} catch (Exception e) {
// 处理失败情况 - 跳转到重试任务
log.error("节点执行过程中发生异常", e);
handleFailure(execution, e);
throw new RuntimeException("节点执行过程中发生异常:" + e.getMessage());
// handleFailure(execution, e);
}
}

View File

@@ -7,13 +7,10 @@ import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.req.data.UploadFilesReq;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.utils.RandomUtil;
import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.entity.ProcessNodeParam;
import com.sdm.flowable.service.IProcessNodeParamService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;
@@ -26,6 +23,8 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* @Description: 生成自动化报告脚本处理器
* @Author: shiman
@@ -40,13 +39,6 @@ public class ExportWordScriptHandler implements ExecutionHandler<Map<String, Obj
@Autowired
private IProcessNodeParamService processNodeParamService;
private static final String TEMP_REPORT_PATH = "/opt/report/";
// todo 用户需要上传脚本文件到当前算列的导出报告节点输入文件夹下,params需要记下脚本文件id此处暂时写死
// 输入参数: 生成结果报告文件名
// 用户得会写脚本文件脚本exportWord.py或者使用我们指定的生成报告脚本
// 文件imageFileIdList: 用户选择指定图片文件id列表用于生成报告时插入到报告中
@Override
public void execute(DelegateExecution execution, Map<String, Object> params, ExportWordScriptExecuteConfig config) {
try {
@@ -95,9 +87,10 @@ public class ExportWordScriptHandler implements ExecutionHandler<Map<String, Obj
String currentNodeObjectKey = currentNodeFileMetadataInfoResp.getObjectKey();
log.info("当前节点配置参数:{}", currentNodeParamJson);
// 前置节点输出文件夹的所有文件通过正则过滤后下载到当前脚本节点的输出文件夹,并使用正则表达式过滤文件
String basePath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + currentNodeObjectKey;
dataFeignClient.downloadFolderToLocal(beforeNodeOutputDirId, basePath, fileRegularStr);
// 前置节点输出文件夹的所有文件通过正则过滤后直接复制到当前脚本节点的输出文件夹,并使用正则表达式过滤文件
String sourcePath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + beforeNodeObjectKey;
String targetPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + currentNodeObjectKey;
copyFilesWithRegex(sourcePath, targetPath, fileRegularStr);
// 下载处理脚本文件到本地
Long exportScriptFileId = config.getExportScriptFileId();
@@ -105,14 +98,14 @@ public class ExportWordScriptHandler implements ExecutionHandler<Map<String, Obj
log.error("未设置导出报告脚本文件");
throw new RuntimeException("未设置导出报告脚本文件");
}
dataFeignClient.downloadFileToLocal(exportScriptFileId, basePath);
dataFeignClient.downloadFileToLocal(exportScriptFileId, targetPath);
FileMetadataInfoResp scriptFileBaseInfo = getFileBaseInfo(exportScriptFileId);
String scriptPath = basePath + scriptFileBaseInfo.getOriginalName();
String scriptPath = targetPath + scriptFileBaseInfo.getOriginalName();
log.info("脚本文件路径:{}", scriptPath);
// 调用脚本
log.info("调用脚本中。。。。。。");
String commands = "python" + " " + scriptPath + " " + basePath;
String commands = "python" + " " + scriptPath + " " + targetPath;
log.info("command:" + commands);
List<String> result = new ArrayList<>();
int runningStatus = -1;
@@ -142,7 +135,7 @@ public class ExportWordScriptHandler implements ExecutionHandler<Map<String, Obj
}
try {
// 获取临时路径中脚本生成的报告
uploadResultFileToMinio(basePath + "report.docx",currentNodeOutputDirId);
uploadResultFileToMinio(targetPath + "report.docx",currentNodeOutputDirId);
} catch (Exception ex) {
log.error("生成自动化报告失败:{}", ex.getMessage(), ex);
throw new RuntimeException("生成自动化报告失败");
@@ -194,4 +187,82 @@ public class ExportWordScriptHandler implements ExecutionHandler<Map<String, Obj
throw new RuntimeException("上传结果文件到MinIO失败: " + e.getMessage(), e);
}
}
/**
* 根据正则表达式从源目录复制文件到目标目录
*
* @param sourcePath 源目录路径
* @param targetPath 目标目录路径
* @param fileRegularStr 正则表达式字符串,用于过滤文件
*/
private void copyFilesWithRegex(String sourcePath, String targetPath, String fileRegularStr) {
try {
File sourceDir = new File(sourcePath);
File targetDir = new File(targetPath);
log.info("复制文件,从 {} 到 {}", sourcePath, targetPath);
if (!sourceDir.exists() || !sourceDir.isDirectory()) {
log.warn("源目录不存在或不是一个目录: {}", sourcePath);
return;
}
if (!targetDir.exists()) {
log.info("目标目录不存在,创建目录: {}", targetPath);
targetDir.mkdirs();
}
// 编译正则表达式(如果提供)
java.util.regex.Pattern pattern = null;
if (fileRegularStr != null && !fileRegularStr.isEmpty()) {
try {
pattern = java.util.regex.Pattern.compile(fileRegularStr, Pattern.CASE_INSENSITIVE);
} catch (Exception e) {
throw new RuntimeException("无效的正则表达式: " + fileRegularStr, e);
}
}
// 遍历源目录中的所有文件
File[] files = sourceDir.listFiles();
log.info("源目录中的文件数量: {}", files.length);
if (files != null) {
for (File file : files) {
if (file.isFile()) {
log.info("开始判断文件:{} 是否符合当前正则: {}", file.getName(),fileRegularStr);
// 如果提供了正则表达式,则只匹配符合正则表达式的文件
if (pattern != null) {
if (!pattern.matcher(file.getName()).matches()) {
log.info("文件:{} 不符合当前正则表达式,跳过", file.getName());
continue; // 跳过不匹配的文件
}
}
log.info("文件:{} 符合当前正则表达式,开始复制", file.getName());
// 复制文件到目标目录
copyFile(file, new File(targetDir, file.getName()));
}
}
}
} catch (Exception e) {
log.error("复制文件失败,从 {} 到 {}: {}", sourcePath, targetPath, e.getMessage(), e);
throw new RuntimeException("复制文件失败: " + e.getMessage(), e);
}
}
/**
* 复制单个文件
*
* @param sourceFile 源文件
* @param targetFile 目标文件
* @throws IOException 当复制过程中出现IO异常时抛出
*/
private void copyFile(File sourceFile, File targetFile) throws IOException {
try (InputStream inputStream = new FileInputStream(sourceFile);
FileOutputStream outputStream = new FileOutputStream(targetFile)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
}
}

View File

@@ -18,15 +18,14 @@ import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.*;
import org.flowable.bpmn.model.Process;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.*;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.repository.Deployment;
import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.Execution;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.job.api.Job;
import org.flowable.task.api.Task;
import org.flowable.validation.ProcessValidator;
import org.flowable.validation.ProcessValidatorFactory;
@@ -36,6 +35,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -55,6 +55,9 @@ public class ProcessService {
@Autowired
private TaskService taskService;
@Autowired
private ManagementService managementService;
@Autowired
private Dto2BpmnConverter dto2BpmnConverter;
@@ -66,7 +69,7 @@ public class ProcessService {
// 部署流程前端传入Flowable标准JSON
public SdmResponse<DeployFlowableResp> deploy(ProcessDefinitionDTO processDTO) throws Exception {
log.info("开始部署流程定义");
log.info("开始部署流程定义: {}",processDTO);
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
log.info("BPMN模型转换完成");
@@ -169,36 +172,128 @@ public class ProcessService {
.processInstanceId(processInstanceId)
.list();
Map<String, HistoricActivityInstance> activityMap = historicActivities.stream()
.collect(Collectors.toMap(
HistoricActivityInstance::getActivityId,
Function.identity(),
(a, b) -> a
));
// 为了处理多实例或循环,最好按结束时间倒序排,取最新的
Map<String, HistoricActivityInstance> activityMap = new HashMap<>();
for (HistoricActivityInstance hist : historicActivities) {
// 如果Map里已经有了且当前的开始时间比Map里的晚则覆盖取最新的
if (!activityMap.containsKey(hist.getActivityId()) ||
hist.getStartTime().after(activityMap.get(hist.getActivityId()).getStartTime())) {
activityMap.put(hist.getActivityId(), hist);
}
}
List<String> activeActivityIds = isProcessRunning(processInstanceId)
? runtimeService.getActiveActivityIds(processInstanceId)
: Collections.emptyList();
// 更新节点状态信息
for (NodeDetailInfo node : nodes) {
String nodeId = node.getId();
HistoricActivityInstance historicActivity = activityMap.get(nodeId);
boolean isActive = activeActivityIds.contains(nodeId);
boolean isFinished = historicActivity != null && historicActivity.getEndTime() != null;
node.setStatus(isFinished ? "finished" : (isActive ? "active" : "pending"));
// 获取异常/失败的任务信息
// Map<ActivityId, ErrorMessage>
Map<String, String> errorMap = new HashMap<>();
if (isProcessRunning(processInstanceId)) {
// 1. 查询死信作业 (DeadLetterJob) - 重试次数用尽,彻底失败
List<Job> deadJobs = managementService.createDeadLetterJobQuery()
.processInstanceId(processInstanceId).list();
if (historicActivity != null) {
node.setStartTime(historicActivity.getStartTime());
if (isFinished) {
node.setEndTime(historicActivity.getEndTime());
node.setDurationInMillis(historicActivity.getDurationInMillis());
if (historicActivity.getDurationInMillis() != null) {
node.setDurationFormatted(formatDuration(historicActivity.getDurationInMillis()));
// 2. 查询普通作业 (Job) - 包含正在重试中的异常 (exceptionMessage不为空)
List<Job> activeJobs = managementService.createJobQuery()
.processInstanceId(processInstanceId).list();
// 3. 辅助查询:获取 Execution 以便将 Job 映射到 ActivityId
// Job 对象里通常只有 executionId我们需要知道这个 execution 当前在哪个 activityId
List<Execution> executions = runtimeService.createExecutionQuery()
.processInstanceId(processInstanceId).list();
Map<String, String> executionToActivityMap = executions.stream()
.collect(Collectors.toMap(Execution::getId, Execution::getActivityId, (v1, v2) -> v1));
// 填充错误 Map
Consumer<Job> mapJobToError = job -> {
if (job.getExceptionMessage() != null) {
String activityId = executionToActivityMap.get(job.getExecutionId());
if (activityId != null) {
errorMap.put(activityId, job.getExceptionMessage());
}
}
};
deadJobs.forEach(mapJobToError);
activeJobs.forEach(mapJobToError);
}
// 更新节点状态信息,遍历节点设置状态,并处理 ServiceTask + ReceiveTask 的合并逻辑
for (NodeDetailInfo node : nodes) {
String originalNodeId = node.getId();
// 4.1 获取当前节点的物理状态
HistoricActivityInstance myHist = activityMap.get(originalNodeId);
boolean isActive = activeActivityIds.contains(originalNodeId);
boolean isFinished = myHist != null && myHist.getEndTime() != null;
// 判断该节点是否有报错
String errorMessage = errorMap.get(originalNodeId);
boolean isError = errorMessage != null;
// 初始状态判定 (优先级Error > Finished > Active > Pending)
String displayStatus;
if (isError) {
displayStatus = "error";
} else if (isFinished) {
displayStatus = "finished";
} else if (isActive) {
displayStatus = "active";
} else {
displayStatus = "pending";
}
Date startTime = myHist != null ? myHist.getStartTime() : null;
Date endTime = myHist != null ? myHist.getEndTime() : null;
Long duration = myHist != null ? myHist.getDurationInMillis() : null;
// 4.2 特殊处理:检查是否为异步回调的 ServiceTask
if (isAsyncCallbackNode(node)) {
// 如果原始节点本身就报错了ServiceTask 执行 JavaDelegate 时抛出异常)
// 此时还没走到 ReceiveTask所以直接显示 Error
if (isError) {
node.setErrorMessage(errorMessage); // 假设 NodeDetailInfo 有这个字段
// 保持 displayStatus = "error"
} else {
// 原有逻辑:推算 ReceiveTask
String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId);
HistoricActivityInstance waitHist = activityMap.get(waitNodeId);
boolean waitIsActive = activeActivityIds.contains(waitNodeId);
boolean waitIsFinished = waitHist != null && waitHist.getEndTime() != null;
if (waitIsActive) {
displayStatus = "active";
endTime = null;
if (startTime != null) {
duration = System.currentTimeMillis() - startTime.getTime();
}
} else if (waitIsFinished) {
displayStatus = "finished";
endTime = waitHist.getEndTime();
if (startTime != null && endTime != null) {
duration = endTime.getTime() - startTime.getTime();
}
} else if (isFinished && !waitIsFinished && !waitIsActive) {
// 极短时间差
displayStatus = "active";
endTime = null;
}
}
} else {
// 非特殊节点,如果是 Error 状态,填充错误信息
if (isError) {
node.setErrorMessage(errorMessage);
}
}
// 4.3 赋值给 DTO
node.setStatus(displayStatus);
node.setStartTime(startTime);
node.setEndTime(endTime);
node.setDurationInMillis(duration);
if (duration != null) {
node.setDurationFormatted(formatDuration(duration));
}
}
}
@@ -336,7 +431,27 @@ public class ProcessService {
return String.format("%ds", seconds);
}
/**
* 辅助方法:解析节点配置判断是否为异步回调节点
*/
private boolean isAsyncCallbackNode(NodeDetailInfo node) {
if (!FlowElementTypeEnums.SERVICETASK.getType().equalsIgnoreCase(node.getType())) {
return false;
}
String configJson = node.getExecuteConfig();
if (configJson == null || configJson.isEmpty()) {
return false;
}
try {
// 这里解析 JSON看 executeConfig.isAsyncCallback()
// 建议使用 FastJson 或 Jackson
JSONObject json = JSONObject.parseObject(configJson);
return json != null && json.getBooleanValue("asyncCallback");
} catch (Exception e) {
log.warn("解析节点配置失败: {}", node.getId());
return false;
}
}
public SdmResponse continueServiceTask(@RequestBody CompleteTaskReq req) {
log.info("开始继续服务任务处理, 请求参数: {}", req);

View File

@@ -41,7 +41,7 @@ public class Dto2BpmnConverter {
List<FlowElementDTO> allElements = dto.getFlowElements();
List<FlowElementDTO> nodeDtos = allElements.stream()
.filter(e -> !FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType()))
.collect(Collectors.toList());
.toList();
List<FlowElementDTO> flowDtos = allElements.stream()
.filter(e -> FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType()))
.collect(Collectors.toList());
@@ -74,9 +74,28 @@ public class Dto2BpmnConverter {
// 6. 创建连线
createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap);
validProcess(process);
return bpmnModel;
}
/**
* 验证流程元素的唯一性
*/
private static void validProcess(Process process) {
Map<String, Integer> counter = new HashMap<>();
for (FlowElement e : process.getFlowElements()) {
counter.put(e.getId(), counter.getOrDefault(e.getId(), 0) + 1);
}
counter.forEach((id, cnt) -> {
if (cnt > 1) {
log.error("❌ 重复 ID 发现: {} 出现次数: {}", id, cnt);
}
});
}
/**
* 添加通用重试任务节点
*/
@@ -111,6 +130,8 @@ public class Dto2BpmnConverter {
String originalNodeId = nodeDto.getId();
String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId);
ReceiveTask receiveTask = new ReceiveTask();
receiveTask.setAsynchronous(true);
disableAsyncRetry(receiveTask);
receiveTask.setId(waitNodeId);
receiveTask.setName(nodeDto.getName() + "等待结果");
process.addFlowElement(receiveTask);
@@ -131,6 +152,8 @@ public class Dto2BpmnConverter {
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
UserTask waitUserTask = new UserTask();
waitUserTask.setAsynchronous(true);
disableAsyncRetry(waitUserTask);
waitUserTask.setId(waitUserId);
waitUserTask.setName(nodeDto.getName() + "等待用户提交");
// 不设置assignee让任何人可以处理
@@ -162,6 +185,7 @@ public class Dto2BpmnConverter {
joinGateway.setName("并行汇聚-" + nodeDto.getName());
process.addFlowElement(joinGateway);
joinGatewayMap.put(nodeId, joinGatewayId);
log.info("添加并行汇聚网关节点ID={}, 汇聚网关ID={}", nodeId, joinGatewayId);
}
// 检查是否需要添加拆分网关(出度>1
@@ -173,6 +197,7 @@ public class Dto2BpmnConverter {
splitGateway.setName("并行拆分-" + nodeDto.getName());
process.addFlowElement(splitGateway);
splitGatewayMap.put(nodeId, splitGatewayId);
log.info("添加并行拆分网关节点ID={}, 拆分网关ID={}", nodeId, splitGatewayId);
}
}
@@ -368,6 +393,8 @@ public class Dto2BpmnConverter {
case STARTEVENT:
// 开始事件:直接映射
StartEvent startEvent = new StartEvent();
// 【关键修改】设置异步:确保 startProcessInstanceById 接口立刻返回 ID
startEvent.setAsynchronous(true);
startEvent.setId(nodeDto.getId());
startEvent.setName(nodeDto.getName());
process.addFlowElement(startEvent);
@@ -381,10 +408,15 @@ public class Dto2BpmnConverter {
endEvent.setName(nodeDto.getName());
process.addFlowElement(endEvent);
log.info("创建结束事件节点 nodeId{}", nodeDto.getId());
break;
case USERTASK:
// 用户任务:映射为 Flowable UserTask
UserTask userTask = new UserTask();
// 【关键修改】设置异步:防止下方绑定的监听器(创建文件夹)报错导致前一个节点回滚
// 这会创建一个 Job 来执行 UserTask 的初始化逻辑(包括执行监听器)
userTask.setAsynchronous(true);
disableAsyncRetry(userTask);
userTask.setId(nodeDto.getId());
userTask.setName(nodeDto.getName());
log.info("创建用户任务节点 nodeId{}", nodeDto.getId());
@@ -419,12 +451,15 @@ public class Dto2BpmnConverter {
case SERVICETASK:
// 服务任务:映射为 Flowable ServiceTask绑定自定义执行器
ServiceTask serviceTask = new ServiceTask();
// 【关键修改】设置异步:实现业务逻辑故障隔离,避免阻塞和连环回滚
serviceTask.setAsynchronous(true);
disableAsyncRetry(serviceTask);
serviceTask.setId(nodeDto.getId());
serviceTask.setName(nodeDto.getName());
// 绑定执行器Bean名称customTaskExecutor
serviceTask.setImplementation("${universalDelegate}");
serviceTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
log.info("创建服务任务节点 nodeId{}", nodeDto.getId());
if (nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null) {
// 添加 Flowable 扩展属性
BaseExecuteConfig serviceTaskExecuteConfig = nodeDto.getExtensionElements().getExecuteConfig();
@@ -471,4 +506,20 @@ public class Dto2BpmnConverter {
}
return flow;
}
/**
* 配置节点失败后不重试R0/PT0S
* @param flowElement 需要配置的节点
*/
private void disableAsyncRetry(FlowElement flowElement) {
ExtensionElement retryElement = new ExtensionElement();
// 必须加上 flowable 的命名空间前缀
retryElement.setNamespace("http://flowable.org/bpmn");
retryElement.setNamespacePrefix("flowable");
retryElement.setName("failedJobRetryTimeCycle");
// R0 代表重试 0 次,即失败立刻停止
retryElement.setElementText("R0/PT0S");
flowElement.addExtensionElement(retryElement);
}
}