userTask自动创建本地文件夹
This commit is contained in:
@@ -59,12 +59,12 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
String nodeId = execution.getCurrentActivityId();
|
||||
String nodeName = execution.getCurrentFlowElement().getName();
|
||||
|
||||
log.info("开始执行节点, 流程实例ID: {}, 节点ID: {}, 节点名称: {}", procInstId, nodeId, nodeName);
|
||||
log.info("universalDelegate 开始执行节点, 流程实例ID: {}, 节点ID: {}, 节点名称: {}", procInstId, nodeId, nodeName);
|
||||
|
||||
// 2. 读取输入参数
|
||||
Map<String, Object> params = processNodeParamService.getParam(processDefinitionId, nodeId, null);
|
||||
Map<String, Object> params = processNodeParamService.getExecuteParam(procInstId, nodeId);
|
||||
|
||||
log.info("节点执行参数, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params);
|
||||
log.info("universalDelegate 节点执行参数, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params);
|
||||
|
||||
// 3、创建本地文件夹,用于后续节点计算直接从本地读取,不需要再从minio中获取数据
|
||||
Long currentNodeOutputDirId = (Long)params.getOrDefault("outputDirId", null);
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.sdm.flowable.listener;
|
||||
|
||||
import com.sdm.common.common.SdmResponse;
|
||||
import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
|
||||
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
|
||||
import com.sdm.common.feign.inter.data.IDataFeignClient;
|
||||
import com.sdm.flowable.constants.FlowableConfig;
|
||||
import com.sdm.flowable.service.IProcessNodeParamService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.ExecutionListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
/**
|
||||
* UserTask 启动时准备本地输出目录的监听器
|
||||
*/
|
||||
@Component("userTaskDirectoryPreparationListener")
|
||||
public class UserTaskDirectoryPreparationListener implements ExecutionListener {
|
||||
|
||||
@Autowired
|
||||
private IProcessNodeParamService processNodeParamService;
|
||||
|
||||
@Autowired
|
||||
private IDataFeignClient dataFeignClient;
|
||||
|
||||
@Override
|
||||
public void notify(DelegateExecution execution) {
|
||||
String nodeId = execution.getCurrentActivityId();
|
||||
String procInstId = execution.getProcessInstanceId();
|
||||
|
||||
//创建本地文件夹,用于后续节点计算直接从本地读取,不需要再从minio中获取数据
|
||||
Map<String, Object> params = processNodeParamService.getExecuteParam(procInstId, nodeId);
|
||||
log.info("userTaskDirectoryPreparationListener, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params);
|
||||
Long currentNodeOutputDirId = (Long)params.getOrDefault("outputDirId", null);
|
||||
GetFileBaseInfoReq getFileBaseInfoReq = new GetFileBaseInfoReq();
|
||||
getFileBaseInfoReq.setFileId(currentNodeOutputDirId);
|
||||
SdmResponse<FileMetadataInfoResp> fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq);
|
||||
if(!fileBaseInfoResp.isSuccess()||fileBaseInfoResp.getData()==null){
|
||||
throw new RuntimeException("当前节点未查询到输入文件夹");
|
||||
}
|
||||
String objectKey = fileBaseInfoResp.getData().getObjectKey();
|
||||
prepareLocalDir(objectKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 准备本地目录:如果目录已存在,则清空其内容;否则创建新目录。
|
||||
* 流程实例启动后,需要在本地准备一个目录,用于存储节点计算结果。
|
||||
* 如果同一个流程二次启动,每个节点会使用同一个文件夹,二次启动的时候,
|
||||
* 如果清空,上一次流程实例运行结果相关文件也会在这个文件夹中,影响这次运行流程的结果文件
|
||||
*
|
||||
* @param objectKey MinIO 的对象路径,将作为本地目录路径的一部分
|
||||
*/
|
||||
private void prepareLocalDir(String objectKey) {
|
||||
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
|
||||
Path localBaseDir = Paths.get(simulationBaseDir).toAbsolutePath().normalize();
|
||||
Path fullLocalPath = localBaseDir.resolve(objectKey).normalize();
|
||||
|
||||
// 安全校验:防止路径穿越
|
||||
if (!fullLocalPath.startsWith(localBaseDir)) {
|
||||
throw new RuntimeException("非法文件夹路径,可能包含路径穿越: " + objectKey);
|
||||
}
|
||||
|
||||
try {
|
||||
if (Files.exists(fullLocalPath)) {
|
||||
//直接删除整个目录
|
||||
log.info("本地目录已存在,将删除并重新创建: {}", fullLocalPath);
|
||||
FileUtils.deleteDirectory(fullLocalPath.toFile());
|
||||
}
|
||||
log.info("创建本地目录: {}", fullLocalPath);
|
||||
Files.createDirectories(fullLocalPath);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("无法准备本地目录: " + fullLocalPath, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,5 +16,19 @@ import java.util.Map;
|
||||
public interface IProcessNodeParamService extends IService<ProcessNodeParam> {
|
||||
void saveParamByProcessDefinitionId(String processDefinitionId, String nodeId, String runId, Map<String, Object> params);
|
||||
void updateNodeParamProcessInstanceId(String runId,String processDefinitionId, String processInstanceId );
|
||||
/**
|
||||
* 进入流程执行页面,查询节点输入参数
|
||||
*
|
||||
* @param processDefinitionId 流程定义ID
|
||||
* @param nodeId 节点ID
|
||||
* @param runId 流程实例ID
|
||||
* @return 节点输入参数
|
||||
*/
|
||||
Map<String, Object> getParam(String processDefinitionId, String nodeId, String runId);
|
||||
|
||||
/**
|
||||
* 节点在执行时,需要获取节点的输入参数
|
||||
*/
|
||||
Map<String, Object> getExecuteParam(String processInstanceId , String nodeId );
|
||||
|
||||
}
|
||||
@@ -73,11 +73,28 @@ public class ProcessNodeParamServiceImpl extends ServiceImpl<ProcessNodeParamMap
|
||||
|
||||
// 查询节点输入参数(流程执行时调用)
|
||||
public Map<String, Object> getParam(String processDefinitionId, String nodeId, String runId) {
|
||||
ProcessNodeParam param = this.lambdaQuery().eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId)
|
||||
ProcessNodeParam param = this.lambdaQuery()
|
||||
.eq( ProcessNodeParam::getRunId, runId)
|
||||
.eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId)
|
||||
.eq(ProcessNodeParam::getNodeId, nodeId)
|
||||
.eq(ObjectUtils.isNotEmpty(runId), ProcessNodeParam::getRunId, runId)
|
||||
.one();
|
||||
if (param == null) {
|
||||
// 当未配置参数时不抛出异常,而是返回空Map
|
||||
return new HashMap<>();
|
||||
}
|
||||
try {
|
||||
return objectMapper.readValue(param.getParamJson(), new TypeReference<Map<String, Object>>() {});
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException("参数反序列化失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getExecuteParam(String processInstanceId, String nodeId) {
|
||||
ProcessNodeParam param = this.lambdaQuery()
|
||||
.eq(ProcessNodeParam::getProcessInstanceId, processInstanceId)
|
||||
.eq(ProcessNodeParam::getNodeId, nodeId)
|
||||
.one();
|
||||
if (param == null) {
|
||||
// 当未配置参数时不抛出异常,而是返回空Map
|
||||
return new HashMap<>();
|
||||
|
||||
@@ -401,7 +401,11 @@ public class Dto2BpmnConverter {
|
||||
// 不设置 assignee 或 candidateUsers,这样任何人都可以处理任务
|
||||
|
||||
// 可选:绑定 TaskListener,在任务完成时触发逻辑
|
||||
userTask.getTaskListeners();
|
||||
FlowableListener dirPrepareListener = new FlowableListener();
|
||||
dirPrepareListener.setEvent("start");
|
||||
dirPrepareListener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
|
||||
dirPrepareListener.setImplementation("${userTaskDirectoryPreparationListener}");
|
||||
userTask.getExecutionListeners().add(dirPrepareListener);
|
||||
|
||||
process.addFlowElement(userTask);
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user