流程节点运行时设置自动或者手动

支持用户定自定义文件选择
This commit is contained in:
2025-12-25 16:40:53 +08:00
parent 8abd4d2117
commit 9e85144f1e
16 changed files with 384 additions and 13 deletions

View File

@@ -39,4 +39,19 @@ public interface FlowableConfig {
* 流程的节点本地文件夹基础路径
*/
String FLOWABLE_SIMULATION_BASEDIR ="/home/simulation/";
/**
* 节点执行模式 Key (存储在 paramJson 中)
* value: "AUTO" | "MANUAL"
*/
String EXECUTE_MODE_KEY = "executeMode";
String EXECUTE_MODE_AUTO = "AUTO";
String EXECUTE_MODE_MANUAL = "MANUAL";
/**
* 手动模式下,用户明确指定的文件列表 Key
* value: Map<String, List<String>>
*/
String EXPLICIT_INPUT_FILES_KEY = "explicitInputFiles"; // 原: _explicitInputFiles
}

View File

@@ -12,7 +12,9 @@ import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
import com.sdm.flowable.aop.StateGuard;
import com.sdm.flowable.delegate.handler.HpcHandler;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq;
import com.sdm.flowable.dto.req.RetryRequest;
import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp;
import com.sdm.flowable.enums.OperationTypeEnum;
import com.sdm.flowable.process.ProcessService;
import com.sdm.flowable.service.IProcessNodeParamService;
@@ -175,6 +177,20 @@ public class ProcessController implements IFlowableFeignClient {
return processService.getProcessAndNodeDetailByInstanceId(processDefinitionId,processInstanceId,runId);
}
/**
* 预览节点输入文件列表
* 逻辑:
* 1. 扫描当前节点 inputDirId (用户手动上传区) -> 全量返回
* 2. 扫描前置节点 outputDirId (上游产出区) -> 根据 regex 过滤返回
* 3. 统一封装为包含绝对路径的 DTO
*/
@PostMapping("/previewNodeInputFiles")
public SdmResponse<List<NodeInputFilePreviewResp>> previewNodeInputFiles(@RequestBody PreviewNodeInputFilesReq previewNodeInputFilesReq) {
return processService.previewNodeInputFiles(previewNodeInputFilesReq);
}
/**
* 流程节点继续执行(完成人工节点/或者等待用户输入后继续手动执行的节点)

View File

@@ -59,7 +59,7 @@ public class UniversalDelegate implements JavaDelegate {
String nodeName = execution.getCurrentFlowElement().getName();
// 2. 读取输入参数
JSONObject params = processNodeParamService.getExecuteParam(runId, nodeId);
JSONObject params = processNodeParamService.getParam(processDefinitionId,nodeId,runId);
log.info("universalDelegate 开始执行节点, runId{},processDefinitionId:{}procInstId: {}, nodeId: {}, nodeName: {},当前节点执行参数 params{}",runId,processDefinitionId, procInstId, nodeId, nodeName,params);
// 3、创建本地文件夹用于后续节点计算直接从本地读取不需要再从minio中获取数据

View File

@@ -79,7 +79,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
SdmResponse<String> submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",params);
status = AsyncTaskStatusEnum.FAIL.getCode();
throw new RuntimeException("HpcHandler submit failed");
}
String hpcTaskId = submitResp.getData();
CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId);

View File

@@ -0,0 +1,26 @@
package com.sdm.flowable.dto.req;
import lombok.Data;
import java.util.Map;
@Data
public class PreviewNodeInputFilesReq {
/** 流程定义ID (可选,视上下文而定) */
private String processDefinitionId;
/** 算例ID */
private String runId;
/** 当前节点ID */
private String nodeId;
/** 前置节点ID */
private String beforeNodeId;
/**
* 正则配置 Map
* Key: 类别标识 (如 "master", "input", "script")
* Value: 正则表达式 (如 "^.+\\.xml$", "^.+\\.json$")
*/
private Map<String, String> regexConfig;
}

View File

@@ -0,0 +1,35 @@
package com.sdm.flowable.dto.resp;
import lombok.Data;
@Data
public class NodeInputFilePreviewResp {
/** 文件名称 */
private String fileName;
/** 文件大小 */
private String fileSize;
/** 创建时间 */
private String createTime;
/**
* 文件完整本地路径 (核心)
* 后端同步MinIO后这里统一返回本地绝对路径
*/
private String filePath;
/**
* 文件来源
* CURRENT: 当前节点手动上传
* PREVIOUS: 上游节点产出
*/
private String source;
/**
* 文件类别 key
* 对应请求中 regexConfig 的 Key (如 "master", "input")
* 如果未匹配到任何正则但被保留(如手动上传),可为 "UNCATEGORIZED"
*/
private String fileCategory;
}

View File

@@ -31,9 +31,10 @@ public class UserTaskDirectoryPreparationListener implements ExecutionListener {
public void notify(DelegateExecution execution) {
String runId = (String) execution.getVariable("runId");
String nodeId = execution.getCurrentActivityId();
String processDefinitionId = execution.getProcessDefinitionId();
//创建本地文件夹用于后续节点计算直接从本地读取不需要再从minio中获取数据
JSONObject params = processNodeParamService.getExecuteParam(runId, nodeId);
JSONObject params =processNodeParamService.getParam(processDefinitionId,nodeId,runId);
log.info("userTaskDirectoryPreparationListener, 启动流程 runId:{},nodeId:{},实例id: {},参数 params:{}", runId,nodeId,execution.getProcessInstanceId(),params);
Long currentNodeOutputDirId = params.getLong("outputDirId");
if(ObjectUtils.isEmpty(currentNodeOutputDirId)){

View File

@@ -6,6 +6,8 @@ import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.resp.flowable.DeployFlowableResp;
import com.sdm.common.entity.resp.flowable.ProcessInstanceDetailResponse;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq;
import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.validation.ValidationError;
@@ -35,6 +37,8 @@ public interface Iprocess {
*/
SdmResponse<ProcessInstanceDetailResponse> getProcessAndNodeDetailByInstanceId(String processDefinitionId, String processInstanceId, String runId);
SdmResponse<List<NodeInputFilePreviewResp>> previewNodeInputFiles(PreviewNodeInputFilesReq req);
SdmResponse continueServiceTask( CompleteTaskReq req);

View File

@@ -5,12 +5,17 @@ import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.flowable.dto.NodeDetailInfo;
import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO;
import com.sdm.common.entity.flowable.dto.ProcessInstanceInfo;
import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.entity.resp.flowable.DeployFlowableResp;
import com.sdm.common.entity.resp.flowable.ProcessInstanceDetailResponse;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.delegate.UniversalDelegate;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq;
import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp;
import com.sdm.flowable.enums.FlowElementTypeEnums;
import com.sdm.flowable.enums.NodeStateEnum;
import com.sdm.flowable.enums.ProcessInstanceStateEnum;
@@ -20,6 +25,7 @@ import com.sdm.flowable.util.Dto2BpmnConverter;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.flowable.bpmn.model.*;
import org.flowable.bpmn.model.Process;
import org.flowable.engine.*;
@@ -39,7 +45,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestBody;
import java.io.File;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -73,6 +81,9 @@ public class ProcessService implements Iprocess{
@Autowired
private IAsyncTaskRecordService asyncTaskRecordService;
@Autowired
private IDataFeignClient dataFeignClient;
// 部署流程前端传入Flowable标准JSON
public SdmResponse<DeployFlowableResp> deploy(ProcessDefinitionDTO processDTO) throws Exception {
log.info("开始部署流程定义: {}",processDTO);
@@ -680,6 +691,186 @@ public class ProcessService implements Iprocess{
return String.format("%ds", seconds);
}
public SdmResponse<List<NodeInputFilePreviewResp>> previewNodeInputFiles(PreviewNodeInputFilesReq req) {
log.info("复合文件预览请求: {}", req);
List<NodeInputFilePreviewResp> resultList = new ArrayList<>();
// ================= 1. 处理当前节点 (inputDirId, 需同步 MinIO) =================
scanCurrentNodeInput(req.getProcessDefinitionId(),req.getRunId(), req.getNodeId(), resultList);
// ================= 2. 处理前置节点 (outputDirId, 本地直接扫描) =================
if (ObjectUtils.isNotEmpty(req.getBeforeNodeId())) {
// 预编译正则 Map (Key: Category, Value: Pattern)
Map<String, Pattern> patternMap = new HashMap<>();
if (req.getRegexConfig() != null) {
req.getRegexConfig().forEach((k, v) -> {
try {
patternMap.put(k, Pattern.compile(v));
} catch (Exception e) {
log.error("正则编译失败: key={}, regex={}", k, v);
}
});
}
scanPreviousNodeOutput(req.getProcessDefinitionId(),req.getRunId(), req.getBeforeNodeId(), patternMap, resultList);
}
return SdmResponse.success(resultList);
}
/**
* 场景1处理当前节点输入
* 关注点inputDirId
* 动作:必须从 MinIO 同步到本地,因为这是用户刚上传的
*/
private void scanCurrentNodeInput(String processDefinitionId,String runId, String nodeId, List<NodeInputFilePreviewResp> resultList) {
try {
// 1. 获取参数
JSONObject nodeParams = processNodeParamService.getParam(processDefinitionId,runId, nodeId);
if (nodeParams == null || !nodeParams.containsKey("inputDirId")) {
log.warn("当前节点 {} 未配置 inputDirId跳过手动上传区扫描", nodeId);
return;
}
Long inputDirId = nodeParams.getLong("inputDirId");
// 2. 获取路径
String objectKey = getObjectKeyByDirId(inputDirId);
if (objectKey == null) return;
String absDirPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + objectKey;
// 3. 【同步】MinIO -> 本地
// 这是一个耗时操作,实际生产中可能需要加锁或判断文件指纹,避免重复下载
syncDirFromMinioToLocal(inputDirId, absDirPath);
File dir = new File(absDirPath);
if (!dir.exists() || !dir.isDirectory()) return;
File[] files = dir.listFiles();
if (files == null) return;
for (File file : files) {
if (file.isFile()) {
NodeInputFilePreviewResp dto = new NodeInputFilePreviewResp();
dto.setFileName(file.getName());
dto.setFileSize(String.valueOf(file.length()));
dto.setCreateTime(String.valueOf(file.lastModified()));
dto.setFilePath(file.getAbsolutePath());
// 来源标记
dto.setSource("CURRENT");
// 类别标记:统一标记为用户上传,不再强行匹配正则分类
dto.setFileCategory("USER_UPLOAD");
resultList.add(dto);
}
}
} catch (Exception e) {
log.error("扫描当前节点输入失败: nodeId={}", nodeId, e);
}
}
/**
* 场景2处理前置节点输出
* 关注点outputDirId
* 动作:直接读本地 (假设大文件不走MinIO或者上一步HPC已经落地了)
*/
private void scanPreviousNodeOutput(String processDefinitionId,String runId, String beforeNodeId, Map<String, Pattern> patternMap, List<NodeInputFilePreviewResp> resultList) {
try {
// 1. 获取参数
JSONObject nodeParams = processNodeParamService.getParam(processDefinitionId,runId, beforeNodeId);
if (nodeParams == null || !nodeParams.containsKey("outputDirId")) {
return;
}
Long outputDirId = nodeParams.getLong("outputDirId");
// 2. 获取路径
String objectKey = getObjectKeyByDirId(outputDirId);
if (objectKey == null) return;
String absDirPath = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR + objectKey;
// 3. 【不同步】直接扫本地
// 假设前置节点是 HPC 节点,结果已经写在共享存储上了
File dir = new File(absDirPath);
if (!dir.exists() || !dir.isDirectory()) return;
File[] files = dir.listFiles();
if (files == null) return;
for (File file : files) {
if (!file.isFile()) continue;
String category = null;
// 必须匹配正则才能入选
if (!patternMap.isEmpty()) {
for (Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
if (entry.getValue().matcher(file.getName()).matches()) {
category = entry.getKey();
break;
}
}
}
// 如果匹配到了分类 (Master/Input 等),则添加
if (category != null) {
NodeInputFilePreviewResp dto = new NodeInputFilePreviewResp();
dto.setFileName(file.getName());
dto.setFileSize(String.valueOf(file.length()));
dto.setCreateTime(String.valueOf(file.lastModified()));
dto.setFilePath(file.getAbsolutePath());
dto.setSource("PREVIOUS");
dto.setFileCategory(category);
resultList.add(dto);
}
// 没匹配到直接丢弃 (前置节点的无关文件)
}
} catch (Exception e) {
log.error("扫描前置节点输出失败: beforeNodeId={}", beforeNodeId, e);
}
}
/**
* 辅助:获取 ObjectKey
*/
private String getObjectKeyByDirId(Long dirId) {
GetFileBaseInfoReq fileReq = new GetFileBaseInfoReq();
fileReq.setFileId(dirId);
SdmResponse<FileMetadataInfoResp> fileResp = dataFeignClient.getFileBaseInfo(fileReq);
if (!fileResp.isSuccess() || fileResp.getData() == null) {
log.warn("查询文件夹信息失败: dirId={}", dirId);
return null;
}
return fileResp.getData().getObjectKey();
}
/**
* 辅助:同步 MinIO 到本地
*/
private void syncDirFromMinioToLocal(Long dirId, String localAbsPath) {
File dir = new File(localAbsPath);
if (!dir.exists()) {
dir.mkdirs();
}
log.info("触发 MinIO 同步: dirId={} -> local={}", dirId, localAbsPath);
try {
dataFeignClient.downloadFolderToLocal(dirId, localAbsPath,null);
} catch (Exception e) {
log.error("MinIO 同步失败: dirId={} -> local={}", dirId, localAbsPath, e);
throw new RuntimeException("MinIO 同步失败: " + e.getMessage(), e);
}
}
public SdmResponse continueServiceTask(@RequestBody CompleteTaskReq req) {
log.info("开始继续服务任务处理, 请求参数: {}", req);

View File

@@ -26,10 +26,4 @@ public interface IProcessNodeParamService extends IService<ProcessNodeParam> {
* @return 节点输入参数
*/
JSONObject getParam(String processDefinitionId, String nodeId, String runId);
/**
* 节点在执行时,需要获取节点的输入参数
*/
JSONObject getExecuteParam(String runId, String nodeId );
}

View File

@@ -0,0 +1,74 @@
package com.sdm.flowable.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.service.IProcessNodeParamService;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 节点执行策略
* 运行时策略层 (NodeExecutionStrategy):实现根据 runId + nodeId 查询数据库参数,决定是否跳过节点。
*/
@Slf4j
@Component("nodeExecutionStrategy") // BPMN 表达式中引用的 Bean 名称
public class NodeExecutionStrategy {
@Autowired
private IProcessNodeParamService processNodeParamService;
/**
* 决定是否跳过 _waitUser 节点
* @param execution 当前执行上下文
* @return true = 跳过(自动执行), false = 不跳过(手动等待)
*/
public boolean shouldSkip(DelegateExecution execution) {
try {
// 1. 获取当前节点ID (例如 taskA_waitUser)
String currentActivityId = execution.getCurrentActivityId();
String processDefinitionId = execution.getProcessDefinitionId();
// 2. 解析出原始 ServiceTask ID (例如 taskA)
// 因为参数是保存在 ServiceTask 上的,不是保存在 _waitUser 上的
String serviceNodeId = FlowNodeIdUtils.getOriginalNodeIdFromWaitUserTask(currentActivityId);
if (serviceNodeId == null) {
log.warn("节点 {} 命名不符合规范无法解析原始ID默认手动", currentActivityId);
return false;
}
// 3. 获取 runId (流程启动时必须传入 runId 变量)
String runId = (String) execution.getVariable("runId");
if (StringUtils.isBlank(runId)) {
log.error("流程变量中缺失 runId无法查询节点参数强制转为手动模式。节点: {}", serviceNodeId);
return false;
}
// 4. 查询数据库 process_node_param
// 使用现有的 getParam 方法查询 ServiceTask 的参数
JSONObject params = processNodeParamService.getParam(processDefinitionId,serviceNodeId,runId);
log.info("是否手动控制节点 {} 参数: {}", serviceNodeId, params);
// 5. 判断执行模式
if (params != null && params.containsKey(FlowableConfig.EXECUTE_MODE_KEY)) {
String mode = params.getString(FlowableConfig.EXECUTE_MODE_KEY);
if (FlowableConfig.EXECUTE_MODE_AUTO.equalsIgnoreCase(mode)) {
log.info("策略判定:节点 {} 配置为 [自动执行] -> 跳过等待", serviceNodeId);
return true; // Skip = true (自动)
}
}
// 6. 默认逻辑:如果没有配置,或者配置为 MANUAL则不跳过
log.info("策略判定:节点 {} 配置为 [手动/未配置] -> 进入等待状态", serviceNodeId);
return false; // Skip = false (手动)
} catch (Exception e) {
log.error("执行策略判定异常,降级为手动模式", e);
return false; // 安全起见,出错时暂停流程,避免失控
}
}
}

View File

@@ -177,7 +177,7 @@ public class Dto2BpmnConverter {
private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map<String, String> waitUserTaskMap) {
// 只有当前节点是ServiceTask才需要判断是否等待用户输入需要才创建前置UserTask
if (isWaitUserEnabled(nodeDto)) {
if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType())) {
String originalNodeId = nodeDto.getId();
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
@@ -186,6 +186,10 @@ public class Dto2BpmnConverter {
disableAsyncRetry(waitUserTask);
waitUserTask.setId(waitUserId);
waitUserTask.setName(nodeDto.getName() + "等待用户提交");
// 绑定 SkipExpression 到策略 Bean,运行时引擎会调用 nodeExecutionStrategy.shouldSkip(execution)
waitUserTask.setSkipExpression("${nodeExecutionStrategy.shouldSkip(execution)}");
// 不设置assignee让任何人可以处理
process.addFlowElement(waitUserTask);