diff --git a/flowable/src/main/java/com/sdm/flowable/config/executeConfig/BaseExecuteConfig.java b/flowable/src/main/java/com/sdm/flowable/config/executeConfig/BaseExecuteConfig.java index ab762290..ab262133 100644 --- a/flowable/src/main/java/com/sdm/flowable/config/executeConfig/BaseExecuteConfig.java +++ b/flowable/src/main/java/com/sdm/flowable/config/executeConfig/BaseExecuteConfig.java @@ -33,4 +33,9 @@ public abstract class BaseExecuteConfig { private boolean asyncCallback = false; // 用于标记回调节点ID,当asyncCallback为true时,表示当前执行节点是异步执行,执行完成后需要回调的节点ID,一般就是receiveTaskId private String callbackNodeId; + + // 是否需要等待用户手动提交执行, + // 默认是false,表示不需要等待用户手动提交执行 + // true: 流程到此停止,创建人工输入任务,等待用户点击执行 + private boolean waitUser = false; } \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java index c457e6b4..f313ce26 100644 --- a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java +++ b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java @@ -2,7 +2,6 @@ package com.sdm.flowable.controller; import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; -import com.sdm.flowable.delegate.UniversalDelegate; import com.sdm.flowable.delegate.handler.HpcHandler; import com.sdm.flowable.dto.ProcessDefinitionDTO; import com.sdm.flowable.dto.req.AsyncCallbackRequest; @@ -30,9 +29,6 @@ public class ProcessController { @Autowired private IProcessNodeParamService processNodeParamService; - @Autowired - private UniversalDelegate universalDelegate; - @Autowired private HpcHandler hpcHandler; @@ -137,8 +133,8 @@ public class ProcessController { * 根据流程 key 和指定版本获取版本流程定义的节点信息 */ @GetMapping("/listNodesByProcessDefinitionKey") - public List> listNodesByProcessDefinitionKey(@RequestParam String processDefinitionKey,@RequestParam(required = false)Integer processDefinitionVersion) { - return processService.listNodesByProcessDefinitionKey(processDefinitionKey,processDefinitionVersion); + public List> listNodesByProcessDefinitionKey(@RequestParam String processDefinitionKey, @RequestParam(required = false) Integer processDefinitionVersion) { + return processService.listNodesByProcessDefinitionKey(processDefinitionKey, processDefinitionVersion); } /** @@ -152,8 +148,6 @@ public class ProcessController { } - - /** * 删除所有流程部署 */ @@ -239,14 +233,14 @@ public class ProcessController { } /** - * 完成人工节点任务 + * 流程节点继续执行(完成人工节点/或者等待用户输入后继续手动执行的节点) * * @param req * @return */ - @PostMapping("/completeManualTasks") - public void completeManualTasks(@RequestBody CompleteTaskReq req) { - processService.completeManualTasks(req); + @PostMapping("/continueServiceTask") + public void continueServiceTask(@RequestBody CompleteTaskReq req) { + processService.continueServiceTask(req); } /** @@ -257,6 +251,6 @@ public class ProcessController { @PostMapping("/asyncCallback") public void asyncCallback(@RequestBody AsyncCallbackRequest request) { // 发送信号唤醒流程实例中等待的节点 - universalDelegate.signalByTaskId(request); + processService.asyncCallback(request); } } \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java b/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java index aa64405f..5d847fab 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/UniversalDelegate.java @@ -11,7 +11,7 @@ import com.sdm.flowable.service.IProcessNodeParamService; import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; -import org.flowable.engine.RuntimeService; +import org.flowable.engine.runtime.Execution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -27,7 +27,7 @@ public class UniversalDelegate implements JavaDelegate { private ObjectMapper objectMapper; @Autowired - private IProcessNodeParamService paramService; + private IProcessNodeParamService processNodeParamService; @Autowired private IAsyncTaskRecordService asyncTaskRecordService; @@ -44,7 +44,7 @@ public class UniversalDelegate implements JavaDelegate { String nodeName = execution.getCurrentFlowElement().getName(); // 2. 读取输入参数 - Map params = paramService.getParam(procInstId, nodeId); + Map params = processNodeParamService.getParam(procInstId, nodeId); log.info("==== 节点执行日志 ====\n流程实例ID:{}\n节点ID:{}\n节点名称:{}\n输入参数:{}\n====================", procInstId, nodeId, nodeName, params); diff --git a/flowable/src/main/java/com/sdm/flowable/dto/req/CompleteTaskReq.java b/flowable/src/main/java/com/sdm/flowable/dto/req/CompleteTaskReq.java index be29de1b..bae37df0 100644 --- a/flowable/src/main/java/com/sdm/flowable/dto/req/CompleteTaskReq.java +++ b/flowable/src/main/java/com/sdm/flowable/dto/req/CompleteTaskReq.java @@ -10,4 +10,12 @@ public class CompleteTaskReq { private String processInstanceId; private String taskDefinitionKey; private Map variables = new HashMap<>(); + /** + * 任务类型:FlowElementTypeEnums + * userTask - 普通用户任务 + * serviceTask - ServiceTask前置隐藏等待任务 + */ + private String flowelementType; + + } \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java b/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java index fb0b9e4a..c40cd824 100644 --- a/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java +++ b/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java @@ -1,10 +1,15 @@ package com.sdm.flowable.process; import com.fasterxml.jackson.core.JsonProcessingException; +import com.sdm.flowable.delegate.UniversalDelegate; import com.sdm.flowable.dto.ProcessDefinitionDTO; +import com.sdm.flowable.dto.req.AsyncCallbackRequest; +import com.sdm.flowable.enums.FlowElementTypeEnums; import com.sdm.flowable.util.Dto2BpmnConverter; import com.sdm.flowable.constants.FlowableConfig; import com.sdm.flowable.dto.req.CompleteTaskReq; +import com.sdm.flowable.util.FlowNodeIdUtils; +import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.*; import org.flowable.bpmn.model.Process; import org.flowable.engine.HistoryService; @@ -24,12 +29,12 @@ import org.flowable.validation.ValidationError; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; import java.util.*; import java.util.stream.Collectors; +@Slf4j @Service public class ProcessService { @Autowired @@ -47,6 +52,9 @@ public class ProcessService { @Autowired private Dto2BpmnConverter dto2BpmnConverter; + @Autowired + private UniversalDelegate universalDelegate; + // 部署流程(前端传入Flowable标准JSON) public Deployment deploy(ProcessDefinitionDTO processDTO) throws Exception { BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO); @@ -493,17 +501,36 @@ public class ProcessService { return result; } - public void completeManualTasks(@RequestBody CompleteTaskReq req) { - Task task = taskService.createTaskQuery() - .processInstanceId(req.getProcessInstanceId()) - .taskDefinitionKey(req.getTaskDefinitionKey()) - .singleResult(); + public void continueServiceTask(@RequestBody CompleteTaskReq req) { + String taskDefKey; - if (task != null) { - taskService.complete(task.getId(), req.getVariables()); + // 根据类型确定真正的 taskDefinitionKey + if (FlowElementTypeEnums.fromString(req.getFlowelementType()).equals(FlowElementTypeEnums.SERVICETASK)) { + // 如果是 ServiceTask 前置等待节点 + taskDefKey = FlowNodeIdUtils.generateWaitUserTaskId(req.getTaskDefinitionKey()); } else { - throw new RuntimeException("找不到任务!"); + // 普通 UserTask + taskDefKey = req.getTaskDefinitionKey(); } + Task task = taskService.createTaskQuery() + .processInstanceId(req.getProcessInstanceId()) + .taskDefinitionKey(taskDefKey) + .singleResult(); + + if (task == null) { + throw new RuntimeException("找不到任务! taskDefinitionKey=" + taskDefKey); + } + + // 完成任务 + if (req.getVariables() != null) { + taskService.complete(task.getId(), req.getVariables()); + } else { + taskService.complete(task.getId()); + } + } + public void asyncCallback(AsyncCallbackRequest request) { + // 发送信号唤醒流程实例中等待的节点 + universalDelegate.signalByTaskId(request); } } \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java index f73aa106..45fc1c14 100644 --- a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java +++ b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java @@ -46,9 +46,12 @@ public class Dto2BpmnConverter { .filter(e -> FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType())) .collect(Collectors.toList()); - // 3. 存储异步任务映射关系(原节点ID → wait节点ID) - Map asyncTaskMap = new HashMap<>(); // 异步任务映射(原节点→wait节点) - + // 3. 存储异步任务ReceiveTask 映射关系(原节点ID → ReceiveTask节点ID) + Map asyncTaskMap = new HashMap<>(); // 异步任务映射(原节点→ReceiveTask节点) + // 3.1、存储等待用户输入任务映射关系(原节点ID → waitUserTask节点ID) + Map waitUserTaskMap = new HashMap<>(); // 原节点ID → waitUserTask节点ID + + // 4. 存储并行网关映射关系(原节点ID → 网关ID) Map splitGatewayMap = new HashMap<>(); // 拆分网关(原拆分节点→拆分网关) Map joinGatewayMap = new HashMap<>(); // 汇总网关(原汇总节点→汇总网关) @@ -57,14 +60,16 @@ public class Dto2BpmnConverter { for (FlowElementDTO nodeDto : nodeDtos) { // 处理异步任务,创建等待节点,放在穿件实际节点之前是为了构造asyncTaskMap,后面createActualNode的时候才能设置回调等待节点 handleAsyncTasks(process, nodeDto, asyncTaskMap); + // 处理等待用户提交任务 + handleWaitUserTask(process, nodeDto, waitUserTaskMap); // 创建实际节点 - createActualNode(process, nodeDto,asyncTaskMap); + createActualNode(process, nodeDto, asyncTaskMap); // 处理并行网关,创建拆分和汇聚节点 addRequiredGateways(process, nodeDto, flowDtos, joinGatewayMap, splitGatewayMap); } // 6. 创建连线 - createConnections(process, flowDtos, asyncTaskMap, joinGatewayMap, splitGatewayMap); + createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap); return bpmnModel; } @@ -75,24 +80,44 @@ public class Dto2BpmnConverter { private void handleAsyncTasks(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) { // 检查节点是否为服务任务或用户任务且标记为异步回调 if ((FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) || - FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) && - nodeDto.getExtensionElements() != null && - nodeDto.getExtensionElements().getExecuteConfig() != null && - nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback()) { - + FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) && + nodeDto.getExtensionElements() != null && + nodeDto.getExtensionElements().getExecuteConfig() != null && + nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback()) { + // 创建接收任务节点 String originalNodeId = nodeDto.getId(); - String waitNodeId = originalNodeId + "_wait"; + String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId); ReceiveTask receiveTask = new ReceiveTask(); receiveTask.setId(waitNodeId); receiveTask.setName(nodeDto.getName() + "等待结果"); process.addFlowElement(receiveTask); - + // 记录映射关系 asyncTaskMap.put(originalNodeId, waitNodeId); } } + private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map waitUserTaskMap) { + // 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask + if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) && + nodeDto.getExtensionElements() != null && + nodeDto.getExtensionElements().getExecuteConfig() != null && + nodeDto.getExtensionElements().getExecuteConfig().isWaitUser()) { + String originalNodeId = nodeDto.getId(); + String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId); + + UserTask waitUserTask = new UserTask(); + waitUserTask.setId(waitUserId); + waitUserTask.setName(nodeDto.getName() + "等待用户提交"); + // 不设置assignee,让任何人可以处理 + process.addFlowElement(waitUserTask); + + // 记录映射 + waitUserTaskMap.put(originalNodeId, waitUserId); + } + } + /** * 添加必要的网关(并行拆分网关和并行汇聚网关) */ @@ -107,7 +132,7 @@ public class Dto2BpmnConverter { // 检查是否需要添加汇聚网关(入度>1) if (incomingCount > 1) { // 如果入度>1,则在节点前插入汇聚网关 - String joinGatewayId = "join_gw_" + nodeId; + String joinGatewayId = FlowNodeIdUtils.generateJoinGatewayId(nodeId); ParallelGateway joinGateway = new ParallelGateway(); joinGateway.setId(joinGatewayId); joinGateway.setName("并行汇聚-" + nodeDto.getName()); @@ -118,7 +143,7 @@ public class Dto2BpmnConverter { // 检查是否需要添加拆分网关(出度>1) if (outgoingCount > 1) { // 如果出度>1,则在节点后插入拆分网关 - String splitGatewayId = "split_gw_" + nodeId; + String splitGatewayId = FlowNodeIdUtils.generateSplitGatewayId(nodeId); ParallelGateway splitGateway = new ParallelGateway(); splitGateway.setId(splitGatewayId); splitGateway.setName("并行拆分-" + nodeDto.getName()); @@ -133,6 +158,7 @@ public class Dto2BpmnConverter { private void createConnections(Process process, List flowDtos, Map asyncTaskMap, + Map waitUserTaskMap, Map joinGatewayMap, Map splitGatewayMap) { @@ -148,9 +174,15 @@ public class Dto2BpmnConverter { // ==================================================================================== // ③ 第三阶段:处理异步任务(等待节点) - // 原逻辑:原节点 → wait → 原本下游 + // 原节点 → wait → 原本下游 // ==================================================================================== handleAsyncTaskConnections(process, asyncTaskMap); + + // ==================================================================================== + // ④ 第三阶段:处理等待用户提交任务 + // 原节点 → waitUserTask → 原节点 + // ==================================================================================== + handleWaitUserTaskConnections(process, waitUserTaskMap); } /** @@ -273,6 +305,35 @@ public class Dto2BpmnConverter { } } + private void handleWaitUserTaskConnections(Process process, Map waitUserTaskMap) { + for (String originalNodeId : waitUserTaskMap.keySet()) { + String waitUserId = waitUserTaskMap.get(originalNodeId); + + // Step 1: 找出原节点的所有入线,改为指向 waitUserTask + List removeLines = new ArrayList<>(); + List originalSources = new ArrayList<>(); + + for (FlowElement ele : process.getFlowElements()) { + if (ele instanceof SequenceFlow sf) { + if (sf.getTargetRef().equals(originalNodeId)) { + originalSources.add(sf.getSourceRef()); + removeLines.add(sf); + } + } + } + removeLines.forEach(f -> process.removeFlowElement(f.getId())); + + // Step 2: 添加原来的入线 → waitUserTask + for (String src : originalSources) { + process.addFlowElement(createSequenceFlow(src, waitUserId, null)); + } + + // Step 3: waitUserTask → 原节点 + process.addFlowElement(createSequenceFlow(waitUserId, originalNodeId, null)); + } + } + + /** * 创建实际的流程节点 */ diff --git a/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java b/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java new file mode 100644 index 00000000..90c3aec7 --- /dev/null +++ b/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java @@ -0,0 +1,60 @@ +package com.sdm.flowable.util; + +public class FlowNodeIdUtils { + private static final String JOIN_GATEWAY_PREFIX = "join_gw_"; + private static final String SPLIT_GATEWAY_PREFIX = "split_gw_"; + private static final String ASYNC_TASK_SUFFIX = "_wait"; + private static final String WAIT_USER_SUFFIX = "_waitUser"; + + // ==================== 网关 ==================== + + public static String generateJoinGatewayId(String nodeId) { + return JOIN_GATEWAY_PREFIX + nodeId; + } + + public static String generateSplitGatewayId(String nodeId) { + return SPLIT_GATEWAY_PREFIX + nodeId; + } + + public static boolean isJoinGateway(String id) { + return id != null && id.startsWith(JOIN_GATEWAY_PREFIX); + } + + public static boolean isSplitGateway(String id) { + return id != null && id.startsWith(SPLIT_GATEWAY_PREFIX); + } + + // ==================== 异步接收任务 ==================== + + public static String generateAsyncTaskId(String nodeId) { + return nodeId + ASYNC_TASK_SUFFIX; + } + + public static boolean isAsyncTask(String id) { + return id != null && id.endsWith(ASYNC_TASK_SUFFIX); + } + + public static String getOriginalNodeIdFromAsyncTask(String asyncTaskId) { + if (!isAsyncTask(asyncTaskId)) { + throw new IllegalArgumentException("不是异步等待节点: " + asyncTaskId); + } + return asyncTaskId.substring(0, asyncTaskId.length() - ASYNC_TASK_SUFFIX.length()); + } + + // ==================== 用户等待任务 ==================== + + public static String generateWaitUserTaskId(String nodeId) { + return nodeId + WAIT_USER_SUFFIX; + } + + public static boolean isWaitUserTask(String id) { + return id != null && id.endsWith(WAIT_USER_SUFFIX); + } + + public static String getOriginalNodeIdFromWaitUserTask(String waitUserTaskId) { + if (!isWaitUserTask(waitUserTaskId)) { + throw new IllegalArgumentException("不是隐藏等待节点: " + waitUserTaskId); + } + return waitUserTaskId.substring(0, waitUserTaskId.length() - WAIT_USER_SUFFIX.length()); + } +} diff --git a/gateway2/src/main/resources/bin/start.sh b/gateway2/src/main/resources/bin/start.sh index b5f9ce8b..c284175f 100644 --- a/gateway2/src/main/resources/bin/start.sh +++ b/gateway2/src/main/resources/bin/start.sh @@ -9,7 +9,7 @@ LOG_HOME="/home/app/gateway2/logs" LOG_FILE="${LOG_HOME}/running.log" # JVM参数 -JVM_OPTS="-Xms512m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOG_HOME}/heapdump.hprof" +JVM_OPTS="-Xmx2g -Xms2g -XX:MaxDirectMemorySize=2g -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOG_HOME}/heapdump.hprof" # 函数定义