This commit is contained in:
2025-12-26 09:30:11 +08:00
committed by gulongcheng
39 changed files with 956 additions and 103 deletions

View File

@@ -1,42 +0,0 @@
package com.sdm.flowable.constants;
public interface FlowableConfig {
/*
* 前端流程节点自定义执行参数key
*/
String EXECUTECONFIG = "executeConfig";
// 异步流程ReceiveTask节点回调返回结果变量名
String RECEIVETASK_CALLBACKE_STATUS = "ReceivetaskCallbackeStatus";
String RECEIVETASK_CALLBACKE_MSG = "ReceivetaskCallbackeMsg";
/*
* 重试相关变量名
*/
String RETRY_TARGET_NODE_ID = "_retryTargetNodeId";
String RETRY_ORIGIN_NODE_ID = "_retryOriginNodeId";
String RETRY_ERROR_MESSAGE = "_retryErrorMessage";
/*
* 重试任务ID
*/
String RETRY_TASK_ID = "genericRetryTask";
/*
* 网关前缀
*/
String JOIN_GATEWAY_PREFIX = "join_gw_";
String SPLIT_GATEWAY_PREFIX = "split_gw_";
/*
* 任务后缀
*/
String ASYNC_TASK_SUFFIX = "_wait"; // 后置接收
String WAIT_USER_SUFFIX = "_waitUser"; //前置人工
String CHECK_SUFFIX = "_check"; // 后置哨兵
/**
* 流程的节点本地文件夹基础路径
*/
String FLOWABLE_SIMULATION_BASEDIR ="/home/simulation/";
}

View File

@@ -12,7 +12,9 @@ import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
import com.sdm.flowable.aop.StateGuard;
import com.sdm.flowable.delegate.handler.HpcHandler;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq;
import com.sdm.flowable.dto.req.RetryRequest;
import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp;
import com.sdm.flowable.enums.OperationTypeEnum;
import com.sdm.flowable.process.ProcessService;
import com.sdm.flowable.service.IProcessNodeParamService;
@@ -175,6 +177,20 @@ public class ProcessController implements IFlowableFeignClient {
return processService.getProcessAndNodeDetailByInstanceId(processDefinitionId,processInstanceId,runId);
}
/**
* 预览节点输入文件列表
* 逻辑:
* 1. 扫描当前节点 inputDirId (用户手动上传区) -> 全量返回
* 2. 扫描前置节点 outputDirId (上游产出区) -> 根据 regex 过滤返回
* 3. 统一封装为包含绝对路径的 DTO
*/
@PostMapping("/previewNodeInputFiles")
public SdmResponse<List<NodeInputFilePreviewResp>> previewNodeInputFiles(@RequestBody PreviewNodeInputFilesReq previewNodeInputFilesReq) {
return processService.previewNodeInputFiles(previewNodeInputFilesReq);
}
/**
* 流程节点继续执行(完成人工节点/或者等待用户输入后继续手动执行的节点)

View File

@@ -4,8 +4,8 @@ import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.springframework.stereotype.Component;
import static com.sdm.flowable.constants.FlowableConfig.RECEIVETASK_CALLBACKE_MSG;
import static com.sdm.flowable.constants.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS;
import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG;
import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS;
/**
* 异步结果校验 Delegate

View File

@@ -8,7 +8,7 @@ import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.delegate.handler.ExecutionHandler;
import com.sdm.flowable.service.IAsyncTaskRecordService;
import com.sdm.flowable.service.IProcessNodeParamService;
@@ -59,7 +59,7 @@ public class UniversalDelegate implements JavaDelegate {
String nodeName = execution.getCurrentFlowElement().getName();
// 2. 读取输入参数
JSONObject params = processNodeParamService.getExecuteParam(runId, nodeId);
JSONObject params = processNodeParamService.getParam(processDefinitionId,nodeId,runId);
log.info("universalDelegate 开始执行节点, runId{},processDefinitionId:{}procInstId: {}, nodeId: {}, nodeName: {},当前节点执行参数 params{}",runId,processDefinitionId, procInstId, nodeId, nodeName,params);
// 3、创建本地文件夹用于后续节点计算直接从本地读取不需要再从minio中获取数据

View File

@@ -14,7 +14,7 @@ import com.sdm.common.entity.resp.task.PerformanceResp;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.feign.inter.project.ISimulationRunFeignClient;
import com.sdm.common.feign.inter.task.ISimuluationPerformanceFeignClient;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.entity.ProcessNodeParam;
import com.sdm.flowable.service.IProcessNodeParamService;
import lombok.extern.slf4j.Slf4j;

View File

@@ -9,7 +9,7 @@ import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.entity.ProcessNodeParam;
import com.sdm.flowable.enums.AsyncTaskStatusEnum;
import com.sdm.flowable.service.IAsyncTaskRecordService;
@@ -22,10 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
// HPC(executeType=HPC)
@@ -68,10 +65,15 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
submitHpcTaskRemoteReq.setMasterFileRegularStr(masterFileRegularStr);
submitHpcTaskRemoteReq.setInputFilesRegularStr(inputFilesRegularStr);
// hpc文件路径处理
dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId);
CoreLogger.info("hpc executeMode:{}",params.get("executeMode"));
String executeMode = params.get("executeMode").toString();
if(StringUtils.isBlank(executeMode)||
(!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&&
!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL))){
throw new RuntimeException("hpc executeMode illegal");
}
// 处理hpc求解文件路径
dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId,executeMode,params);
// 实现HPC处理逻辑...
// INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败)
String status = AsyncTaskStatusEnum.INIT.getCode();
@@ -79,7 +81,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
SdmResponse<String> submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",params);
status = AsyncTaskStatusEnum.FAIL.getCode();
throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage());
}
String hpcTaskId = submitResp.getData();
CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId);
@@ -97,7 +99,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
}
private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId,
String processDefinitionId, String processInstanceId) {
String processDefinitionId, String processInstanceId, String executeMode,Map<String, Object> params) {
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
// 查询前节点的工作目录---》本地磁盘对应目录
// 查询前节点和当前节点的工作目录---》本地磁盘对应目录
@@ -115,15 +117,49 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){
throw new RuntimeException("未获取到当前节点或者求解文件节点信息");
}
ProcessNodeParam beforeNode = beforeNodeParams.get(0);
// 自动,前一个节点
submitHpcTaskRemoteReq.setExecuteMode(executeMode);
if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)){
ProcessNodeParam beforeNode = beforeNodeParams.get(0);
String beforeNodeJectKey = getNodeObjectKey(beforeNode);
// 本地求解文件路径 taskLocalBaseDir
submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey);
CoreLogger.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey);
}
// 手动上传的
if (Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)) {
List<String> masterFilePaths = getFileListFromMap(params, "masterFileRegularStr");
List<String> inPutFilePaths = getFileListFromMap(params, "inputFilesRegularStr");
if(CollectionUtils.isEmpty(masterFilePaths)||CollectionUtils.isEmpty(inPutFilePaths)){
CoreLogger.warn("hpc executeMode manual,filepath illegal");
throw new RuntimeException("hpc executeMode manual,filepath illegal");
}
submitHpcTaskRemoteReq.setManualMasterFilepaths(masterFilePaths);
submitHpcTaskRemoteReq.setManualInputFilePaths(inPutFilePaths);
}
// hpc 节点回传路径
ProcessNodeParam currentNode = currentNodeParams.get(0);
String beforeNodeJectKey = getNodeObjectKey(beforeNode);
String currentNodeJectKey = getNodeObjectKey(currentNode);
// 本地求解文件路径 taskLocalBaseDir
submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey);
// hpc 回传文件路径
submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey);
CoreLogger.info("localSaveDir :{} ,{}",simulationBaseDir + beforeNodeJectKey,simulationBaseDir + currentNodeJectKey);
CoreLogger.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey);
}
/**
* 通用方法:从 Map 中提取 explicitInputFiles 下的指定文件列表
* @param dataMap 根 Map
* @param key 要提取的列表名称masterFileRegularStr/inputFilesRegularStr
* @return 字符串列表,不存在则返回空列表(避免 NPE
*/
@SuppressWarnings("unchecked")
private static List<String> getFileListFromMap(Map<String, Object> dataMap, String key) {
return Optional.ofNullable(dataMap)
// 提取 explicitInputFiles 子 Map
.map(map -> (Map<String, Object>) map.get("explicitInputFiles"))
// 提取指定 key 的列表
.map(explicitMap -> (List<String>) explicitMap.get(key))
// 为空则返回空列表,避免后续遍历 NPE
.orElse(List.of());
}

View File

@@ -0,0 +1,26 @@
package com.sdm.flowable.dto.req;
import lombok.Data;
import java.util.Map;
@Data
public class PreviewNodeInputFilesReq {
/** 流程定义ID (可选,视上下文而定) */
private String processDefinitionId;
/** 算例ID */
private String runId;
/** 当前节点ID */
private String nodeId;
/** 前置节点ID */
private String beforeNodeId;
/**
* 正则配置 Map
* Key: 类别标识 (如 "master", "input", "script")
* Value: 正则表达式 (如 "^.+\\.xml$", "^.+\\.json$")
*/
private Map<String, String> regexConfig;
}

View File

@@ -0,0 +1,35 @@
package com.sdm.flowable.dto.resp;
import lombok.Data;
@Data
public class NodeInputFilePreviewResp {
/** 文件名称 */
private String fileName;
/** 文件大小 */
private String fileSize;
/** 创建时间 */
private String createTime;
/**
* 文件完整本地路径 (核心)
* 后端同步MinIO后这里统一返回本地绝对路径
*/
private String filePath;
/**
* 文件来源
* CURRENT: 当前节点手动上传
* PREVIOUS: 上游节点产出
*/
private String source;
/**
* 文件类别 key
* 对应请求中 regexConfig 的 Key (如 "master", "input")
* 如果未匹配到任何正则但被保留(如手动上传),可为 "UNCATEGORIZED"
*/
private String fileCategory;
}

View File

@@ -1,6 +1,6 @@
package com.sdm.flowable.listener;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BpmnModel;

View File

@@ -31,9 +31,10 @@ public class UserTaskDirectoryPreparationListener implements ExecutionListener {
public void notify(DelegateExecution execution) {
String runId = (String) execution.getVariable("runId");
String nodeId = execution.getCurrentActivityId();
String processDefinitionId = execution.getProcessDefinitionId();
//创建本地文件夹用于后续节点计算直接从本地读取不需要再从minio中获取数据
JSONObject params = processNodeParamService.getExecuteParam(runId, nodeId);
JSONObject params =processNodeParamService.getParam(processDefinitionId,nodeId,runId);
log.info("userTaskDirectoryPreparationListener, 启动流程 runId:{},nodeId:{},实例id: {},参数 params:{}", runId,nodeId,execution.getProcessInstanceId(),params);
Long currentNodeOutputDirId = params.getLong("outputDirId");
if(ObjectUtils.isEmpty(currentNodeOutputDirId)){

View File

@@ -6,6 +6,8 @@ import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.resp.flowable.DeployFlowableResp;
import com.sdm.common.entity.resp.flowable.ProcessInstanceDetailResponse;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq;
import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.validation.ValidationError;
@@ -35,6 +37,8 @@ public interface Iprocess {
*/
SdmResponse<ProcessInstanceDetailResponse> getProcessAndNodeDetailByInstanceId(String processDefinitionId, String processInstanceId, String runId);
SdmResponse<List<NodeInputFilePreviewResp>> previewNodeInputFiles(PreviewNodeInputFilesReq req);
SdmResponse continueServiceTask( CompleteTaskReq req);

View File

@@ -5,12 +5,17 @@ import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.flowable.dto.NodeDetailInfo;
import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO;
import com.sdm.common.entity.flowable.dto.ProcessInstanceInfo;
import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.entity.resp.flowable.DeployFlowableResp;
import com.sdm.common.entity.resp.flowable.ProcessInstanceDetailResponse;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.delegate.UniversalDelegate;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq;
import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp;
import com.sdm.flowable.enums.FlowElementTypeEnums;
import com.sdm.flowable.enums.NodeStateEnum;
import com.sdm.flowable.enums.ProcessInstanceStateEnum;
@@ -20,6 +25,7 @@ import com.sdm.flowable.util.Dto2BpmnConverter;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.flowable.bpmn.model.*;
import org.flowable.bpmn.model.Process;
import org.flowable.engine.*;
@@ -39,7 +45,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestBody;
import java.io.File;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -73,6 +81,9 @@ public class ProcessService implements Iprocess{
@Autowired
private IAsyncTaskRecordService asyncTaskRecordService;
@Autowired
private IDataFeignClient dataFeignClient;
// 部署流程前端传入Flowable标准JSON
public SdmResponse<DeployFlowableResp> deploy(ProcessDefinitionDTO processDTO) throws Exception {
log.info("开始部署流程定义: {}",processDTO);
@@ -680,6 +691,162 @@ public class ProcessService implements Iprocess{
return String.format("%ds", seconds);
}
public SdmResponse<List<NodeInputFilePreviewResp>> previewNodeInputFiles(PreviewNodeInputFilesReq req) {
log.info("复合文件预览请求: {}", req);
List<NodeInputFilePreviewResp> resultList = new ArrayList<>();
// ================= 1. 处理当前节点 (inputDirId, 需同步 MinIO) =================
scanCurrentNodeInput(req.getProcessDefinitionId(),req.getRunId(), req.getNodeId(), resultList);
// ================= 2. 处理前置节点 (outputDirId, 本地直接扫描) =================
if (ObjectUtils.isNotEmpty(req.getBeforeNodeId())) {
// 预编译正则 Map (Key: Category, Value: Pattern)
Map<String, Pattern> patternMap = new HashMap<>();
if (req.getRegexConfig() != null) {
req.getRegexConfig().forEach((k, v) -> {
try {
patternMap.put(k, Pattern.compile(v));
} catch (Exception e) {
log.error("正则编译失败: key={}, regex={}", k, v);
}
});
}
scanPreviousNodeOutput(req.getProcessDefinitionId(),req.getRunId(), req.getBeforeNodeId(), patternMap, resultList);
}
return SdmResponse.success(resultList);
}
/**
* 场景1处理当前节点输入
* 关注点inputDirId
* 动作:必须从 MinIO 同步到本地,因为这是用户刚上传的
*/
private void scanCurrentNodeInput(String processDefinitionId,String runId, String nodeId, List<NodeInputFilePreviewResp> resultList) {
try {
// 1. 获取参数
JSONObject nodeParams = processNodeParamService.getParam(processDefinitionId,nodeId,runId);
if (nodeParams == null || !nodeParams.containsKey("inputDirId")) {
log.warn("当前节点 {} 未配置 inputDirId跳过手动上传区扫描", nodeId);
return;
}
Long inputDirId = nodeParams.getLong("inputDirId");
// 2. 获取路径
String objectKey = getObjectKeyByDirId(inputDirId);
if (objectKey == null) return;
String absDirPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + objectKey;
File dir = new File(absDirPath);
if (!dir.exists() || !dir.isDirectory()) return;
File[] files = dir.listFiles();
if (files == null) return;
for (File file : files) {
if (file.isFile()) {
NodeInputFilePreviewResp dto = new NodeInputFilePreviewResp();
dto.setFileName(file.getName());
dto.setFileSize(String.valueOf(file.length()));
dto.setCreateTime(String.valueOf(file.lastModified()));
dto.setFilePath(file.getAbsolutePath());
// 来源标记
dto.setSource("CURRENT");
// 类别标记:统一标记为用户上传,不再强行匹配正则分类
dto.setFileCategory("USER_UPLOAD");
resultList.add(dto);
}
}
} catch (Exception e) {
log.error("扫描当前节点输入失败: nodeId={}", nodeId, e);
}
}
/**
* 场景2处理前置节点输出
* 关注点outputDirId
* 动作:直接读本地 (假设大文件不走MinIO或者上一步HPC已经落地了)
*/
private void scanPreviousNodeOutput(String processDefinitionId,String runId, String beforeNodeId, Map<String, Pattern> patternMap, List<NodeInputFilePreviewResp> resultList) {
try {
// 1. 获取参数
JSONObject nodeParams = processNodeParamService.getParam(processDefinitionId,beforeNodeId,runId);
if (nodeParams == null || !nodeParams.containsKey("outputDirId")) {
return;
}
Long outputDirId = nodeParams.getLong("outputDirId");
// 2. 获取路径
String objectKey = getObjectKeyByDirId(outputDirId);
if (objectKey == null) return;
String absDirPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + objectKey;
// 3. 【不同步】直接扫本地
// 假设前置节点是 HPC 节点,结果已经写在共享存储上了
File dir = new File(absDirPath);
if (!dir.exists() || !dir.isDirectory()) return;
File[] files = dir.listFiles();
if (files == null) return;
for (File file : files) {
if (!file.isFile()) continue;
String category = null;
// 必须匹配正则才能入选
if (!patternMap.isEmpty()) {
for (Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
if (entry.getValue().matcher(file.getName()).matches()) {
category = entry.getKey();
break;
}
}
}
// 如果匹配到了分类 (Master/Input 等),则添加
if (category != null) {
NodeInputFilePreviewResp dto = new NodeInputFilePreviewResp();
dto.setFileName(file.getName());
dto.setFileSize(String.valueOf(file.length()));
dto.setCreateTime(String.valueOf(file.lastModified()));
dto.setFilePath(file.getAbsolutePath());
dto.setSource("PREVIOUS");
dto.setFileCategory(category);
resultList.add(dto);
}
// 没匹配到直接丢弃 (前置节点的无关文件)
}
} catch (Exception e) {
log.error("扫描前置节点输出失败: beforeNodeId={}", beforeNodeId, e);
}
}
/**
* 辅助:获取 ObjectKey
*/
private String getObjectKeyByDirId(Long dirId) {
GetFileBaseInfoReq fileReq = new GetFileBaseInfoReq();
fileReq.setFileId(dirId);
SdmResponse<FileMetadataInfoResp> fileResp = dataFeignClient.getFileBaseInfo(fileReq);
if (!fileResp.isSuccess() || fileResp.getData() == null) {
log.warn("查询文件夹信息失败: dirId={}", dirId);
return null;
}
return fileResp.getData().getObjectKey();
}
public SdmResponse continueServiceTask(@RequestBody CompleteTaskReq req) {
log.info("开始继续服务任务处理, 请求参数: {}", req);

View File

@@ -26,10 +26,4 @@ public interface IProcessNodeParamService extends IService<ProcessNodeParam> {
* @return 节点输入参数
*/
JSONObject getParam(String processDefinitionId, String nodeId, String runId);
/**
* 节点在执行时,需要获取节点的输入参数
*/
JSONObject getExecuteParam(String runId, String nodeId );
}

View File

@@ -17,12 +17,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.sdm.flowable.constants.FlowableConfig.RECEIVETASK_CALLBACKE_MSG;
import static com.sdm.flowable.constants.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS;
import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG;
import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS;
/**
* <p>

View File

@@ -0,0 +1,75 @@
package com.sdm.flowable.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.service.IProcessNodeParamService;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 节点执行策略
* 运行时策略层 (NodeExecutionStrategy):实现根据 runId + nodeId 查询数据库参数,决定是否跳过节点。
*/
@Slf4j
@Component("nodeExecutionStrategy") // BPMN 表达式中引用的 Bean 名称
public class NodeExecutionStrategy {
@Autowired
private IProcessNodeParamService processNodeParamService;
/**
* 决定是否跳过 _waitUser 节点
* @param execution 当前执行上下文
* @return true = 跳过(自动执行), false = 不跳过(手动等待)
*/
public boolean shouldSkip(DelegateExecution execution) {
try {
log.info("执行策略判定:当前节点 {}", execution.getCurrentActivityId());
// 1. 获取当前节点ID (例如 taskA_waitUser)
String currentActivityId = execution.getCurrentActivityId();
String processDefinitionId = execution.getProcessDefinitionId();
// 2. 解析出原始 ServiceTask ID (例如 taskA)
// 因为参数是保存在 ServiceTask 上的,不是保存在 _waitUser 上的
String serviceNodeId = FlowNodeIdUtils.getOriginalNodeIdFromWaitUserTask(currentActivityId);
if (serviceNodeId == null) {
log.warn("节点 {} 命名不符合规范无法解析原始ID默认手动", currentActivityId);
return false;
}
// 3. 获取 runId (流程启动时必须传入 runId 变量)
String runId = (String) execution.getVariable("runId");
if (StringUtils.isBlank(runId)) {
log.error("流程变量中缺失 runId无法查询节点参数强制转为手动模式。节点: {}", serviceNodeId);
return false;
}
// 4. 查询数据库 process_node_param
// 使用现有的 getParam 方法查询 ServiceTask 的参数
JSONObject params = processNodeParamService.getParam(processDefinitionId,serviceNodeId,runId);
log.info("是否手动控制节点 {} 参数: {}", serviceNodeId, params);
// 5. 判断执行模式
if (params != null && params.containsKey(FlowableConfig.EXECUTE_MODE_KEY)) {
String mode = params.getString(FlowableConfig.EXECUTE_MODE_KEY);
if (FlowableConfig.EXECUTE_MODE_AUTO.equalsIgnoreCase(mode)) {
log.info("策略判定:节点 {} 配置为 [自动执行] -> 跳过等待", serviceNodeId);
return true; // Skip = true (自动)
}
}
// 6. 默认逻辑:如果没有配置,或者配置为 MANUAL则不跳过
log.info("策略判定:节点 {} 配置为 [手动/未配置] -> 进入等待状态", serviceNodeId);
return false; // Skip = false (手动)
} catch (Exception e) {
log.error("执行策略判定异常,降级为手动模式", e);
return false; // 安全起见,出错时暂停流程,避免失控
}
}
}

View File

@@ -85,17 +85,4 @@ public class ProcessNodeParamServiceImpl extends ServiceImpl<ProcessNodeParamMap
}
return JSONObject.parseObject(param.getParamJson());
}
@Override
public JSONObject getExecuteParam(String runId, String nodeId) {
ProcessNodeParam param = this.lambdaQuery()
.eq(ProcessNodeParam::getRunId, runId)
.eq(ProcessNodeParam::getNodeId, nodeId)
.one();
if (param == null) {
// 当未配置参数时不抛出异常而是返回空Map
return new JSONObject();
}
return JSONObject.parseObject(param.getParamJson());
}
}

View File

@@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.sdm.common.entity.flowable.dto.FlowElementDTO;
import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO;
import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.config.FlowableConfig;
import com.sdm.flowable.enums.FlowElementTypeEnums;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.*;
@@ -177,7 +177,7 @@ public class Dto2BpmnConverter {
private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map<String, String> waitUserTaskMap) {
// 只有当前节点是ServiceTask才需要判断是否等待用户输入需要才创建前置UserTask
if (isWaitUserEnabled(nodeDto)) {
if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType())) {
String originalNodeId = nodeDto.getId();
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
@@ -186,6 +186,10 @@ public class Dto2BpmnConverter {
disableAsyncRetry(waitUserTask);
waitUserTask.setId(waitUserId);
waitUserTask.setName(nodeDto.getName() + "等待用户提交");
// 绑定 SkipExpression 到策略 Bean,运行时引擎会调用 nodeExecutionStrategy.shouldSkip(execution)
waitUserTask.setSkipExpression("${nodeExecutionStrategy.shouldSkip(execution)}");
// 不设置assignee让任何人可以处理
process.addFlowElement(waitUserTask);
@@ -429,6 +433,15 @@ public class Dto2BpmnConverter {
startEvent.setAsynchronous(true);
startEvent.setId(nodeDto.getId());
startEvent.setName(nodeDto.getName());
// 2. 添加执行监听器:自动设置 skip 开关变量
FlowableListener skipEnableListener = new FlowableListener();
skipEnableListener.setEvent("start"); // 在开始事件启动时触发
skipEnableListener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_EXPRESSION);
// 直接使用表达式设置变量,不需要额外写 Java 类,主要是为了让 SkipExpression能生效
skipEnableListener.setImplementation("${execution.setVariable('_FLOWABLE_SKIP_EXPRESSION_ENABLED', true)}");
startEvent.getExecutionListeners().add(skipEnableListener);
process.addFlowElement(startEvent);
log.info("创建开始事件节点 nodeId{}", nodeDto.getId());
break;

View File

@@ -1,6 +1,6 @@
package com.sdm.flowable.util;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.common.config.FlowableConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;