扩展流程任意节点重试功能
This commit is contained in:
@@ -35,7 +35,7 @@ spring:
|
||||
nacos:
|
||||
discovery:
|
||||
server-addr: 192.168.65.161:8848
|
||||
group: DAI_GROUP
|
||||
group: LOCAL_GROUP
|
||||
enabled: true
|
||||
servlet:
|
||||
multipart:
|
||||
|
||||
@@ -49,11 +49,12 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
String nodeId = execution.getCurrentActivityId();
|
||||
String nodeName = execution.getCurrentFlowElement().getName();
|
||||
|
||||
log.info("开始执行节点, 流程实例ID: {}, 节点ID: {}, 节点名称: {}", procInstId, nodeId, nodeName);
|
||||
|
||||
// 2. 读取输入参数
|
||||
Map<String, Object> params = processNodeParamService.getParam(procInstId, nodeId);
|
||||
|
||||
log.info("==== 节点执行日志 ====\n流程实例ID:{}\n节点ID:{}\n节点名称:{}\n输入参数:{}\n====================",
|
||||
procInstId, nodeId, nodeName, params);
|
||||
log.info("节点执行参数, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params);
|
||||
|
||||
// 检查是否有扩展元素配置
|
||||
if (execution.getCurrentFlowElement().getExtensionElements() != null &&
|
||||
@@ -64,23 +65,30 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
.getExtensionElements()
|
||||
.get(FlowableConfig.EXECUTECONFIG).get(0).getElementText();
|
||||
|
||||
log.info("节点扩展配置, 流程实例ID: {}, 节点ID: {}, 扩展配置: {}", procInstId, nodeId, extensionElement);
|
||||
|
||||
BaseExecuteConfig config =
|
||||
objectMapper.readValue(extensionElement, BaseExecuteConfig.class);
|
||||
|
||||
String executeType = config.getExecuteType();
|
||||
ExecutionHandler handler = handlerMap.get(executeType);
|
||||
if (handler == null) {
|
||||
log.error("不支持的执行方式, 流程实例ID: {}, 节点ID: {}, 执行方式: {}", procInstId, nodeId, executeType);
|
||||
throw new RuntimeException("不支持的执行方式:" + executeType);
|
||||
}
|
||||
|
||||
log.info("开始执行具体任务处理逻辑, 流程实例ID: {}, 节点ID: {}, 执行方式: {}", procInstId, nodeId, executeType);
|
||||
// 执行具体的任务处理逻辑
|
||||
handler.execute(execution, params, config);
|
||||
log.info("任务处理逻辑执行完成, 流程实例ID: {}, 节点ID: {}, 执行方式: {}", procInstId, nodeId, executeType);
|
||||
} else {
|
||||
// 对于没有配置 executeConfig 的节点(如 userTask),直接完成任务
|
||||
log.info("节点 {} 没有执行配置,直接完成任务", nodeName);
|
||||
}
|
||||
log.info("节点执行完成, 流程实例ID: {}, 节点ID: {}, 节点名称: {}", procInstId, nodeId, nodeName);
|
||||
} catch (Exception e) {
|
||||
// 处理失败情况 - 跳转到重试任务
|
||||
log.error("节点执行过程中发生异常", e);
|
||||
handleFailure(execution, e);
|
||||
}
|
||||
}
|
||||
@@ -93,12 +101,15 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
private void handleFailure(DelegateExecution execution, Exception e) {
|
||||
String procInstId = execution.getProcessInstanceId();
|
||||
String failedNodeId = execution.getCurrentActivityId();
|
||||
String nodeName = execution.getCurrentFlowElement().getName();
|
||||
|
||||
log.error("节点执行失败,节点ID: {}, 错误信息: {}", failedNodeId, e.getMessage(), e);
|
||||
log.error("节点执行失败,流程实例ID: {}, 节点ID: {}, 节点名称: {}, 错误信息: {}", procInstId, failedNodeId, nodeName, e.getMessage(), e);
|
||||
|
||||
// 记录失败节点(供后续重试用)
|
||||
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_ORIGIN_NODE_ID, failedNodeId);
|
||||
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_ERROR_MESSAGE, e.getMessage());
|
||||
|
||||
log.info("准备跳转到重试任务, 流程实例ID: {}, 失败节点ID: {}, 重试任务ID: {}", procInstId, failedNodeId, FlowNodeIdUtils.getRetryTaskId());
|
||||
|
||||
// 跳转到通用重试任务
|
||||
runtimeService.createChangeActivityStateBuilder()
|
||||
@@ -109,6 +120,7 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
)
|
||||
.changeState();
|
||||
|
||||
log.info("已完成跳转到重试任务, 流程实例ID: {}, 失败节点ID: {}", procInstId, failedNodeId);
|
||||
// 不抛出异常,让流程继续
|
||||
}
|
||||
|
||||
@@ -122,7 +134,9 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
* HPC 回调接口,用于唤醒等待的流程
|
||||
*/
|
||||
public void signalByTaskId(AsyncCallbackRequest request) {
|
||||
log.info("接收到异步回调请求: {}", request);
|
||||
asyncTaskRecordService.completeAsyncTask(request);
|
||||
log.info("异步回调处理完成: {}", request.getAsyncTaskId());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -36,22 +36,37 @@ public class RetryRedirectListener implements ExecutionListener {
|
||||
@Override
|
||||
public void notify(DelegateExecution execution) {
|
||||
String procInstId = execution.getProcessInstanceId();
|
||||
String currentActivityId = execution.getCurrentActivityId();
|
||||
|
||||
log.info("重试跳转监听器开始执行, 流程实例ID: {}, 当前活动ID: {}", procInstId, currentActivityId);
|
||||
|
||||
// 1. 获取目标节点ID(由重试接口设置)
|
||||
String targetNodeId = (String) runtimeService.getVariable(procInstId, FlowableConfig.RETRY_TARGET_NODE_ID);
|
||||
log.info("获取重试目标节点ID, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
|
||||
|
||||
if (targetNodeId == null) {
|
||||
log.error("未指定重试目标节点, 流程实例ID: {}", procInstId);
|
||||
throw new IllegalStateException("未指定重试目标节点");
|
||||
}
|
||||
|
||||
// 2. 判断目标节点类型
|
||||
log.info("获取BPMN模型, 流程定义ID: {}", execution.getProcessDefinitionId());
|
||||
BpmnModel bpmnModel = repositoryService.getBpmnModel(execution.getProcessDefinitionId());
|
||||
FlowElement targetElement = bpmnModel.getMainProcess().getFlowElement(targetNodeId);
|
||||
|
||||
log.info("获取目标节点元素, 流程实例ID: {}, 目标节点ID: {}, 目标节点对象为空: {}", procInstId, targetNodeId, targetElement == null);
|
||||
|
||||
if (targetElement == null) {
|
||||
log.error("目标节点不存在, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
|
||||
throw new IllegalArgumentException("目标节点不存在: " + targetNodeId);
|
||||
}
|
||||
|
||||
log.info("目标节点类型: {}", targetElement.getClass().getSimpleName());
|
||||
|
||||
// 3. 跳转到目标节点
|
||||
log.info("开始跳转到目标节点, 流程实例ID: {}, 源节点ID: {}, 目标节点ID: {}",
|
||||
procInstId, FlowNodeIdUtils.getRetryTaskId(), targetNodeId);
|
||||
|
||||
runtimeService.createChangeActivityStateBuilder()
|
||||
.processInstanceId(procInstId)
|
||||
.moveActivityIdsToSingleActivityId(
|
||||
@@ -59,24 +74,34 @@ public class RetryRedirectListener implements ExecutionListener {
|
||||
targetNodeId
|
||||
)
|
||||
.changeState();
|
||||
|
||||
log.info("节点跳转操作完成, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
|
||||
|
||||
// 4. 如果目标是 UserTask,自动完成它(模拟"无人值守")
|
||||
if (targetElement instanceof UserTask) {
|
||||
log.info("目标节点是UserTask,准备自动完成, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
|
||||
|
||||
// 等待任务创建(异步操作可能有延迟)
|
||||
awaitUserTaskCreated(procInstId, targetNodeId);
|
||||
|
||||
// 查询刚创建的 UserTask
|
||||
log.info("查询刚创建的UserTask, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
|
||||
Task userTask = taskService.createTaskQuery()
|
||||
.processInstanceId(procInstId)
|
||||
.taskDefinitionKey(targetNodeId)
|
||||
.orderByTaskCreateTime().desc() // 取最新
|
||||
.singleResult();
|
||||
|
||||
log.info("UserTask查询结果, 流程实例ID: {}, 目标节点ID: {}, 任务对象为空: {}", procInstId, targetNodeId, userTask == null);
|
||||
|
||||
if (userTask != null) {
|
||||
// 自动完成 UserTask(不传变量,或传空)
|
||||
log.info("开始自动完成UserTask, 任务ID: {}, 流程实例ID: {}, 目标节点ID: {}", userTask.getId(), procInstId, targetNodeId);
|
||||
taskService.complete(userTask.getId());
|
||||
log.info("自动完成 UserTask: {}", targetNodeId);
|
||||
}
|
||||
} else {
|
||||
log.info("目标节点不是UserTask,无需自动完成, 节点类型: {}", targetElement.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
// 如果是 ServiceTask,引擎会自动执行,无需干预
|
||||
@@ -85,20 +110,33 @@ public class RetryRedirectListener implements ExecutionListener {
|
||||
|
||||
// 辅助方法:等待 UserTask 创建(简单轮询)
|
||||
private void awaitUserTaskCreated(String procInstId, String taskDefKey) {
|
||||
log.info("等待UserTask创建, 流程实例ID: {}, 任务定义Key: {}", procInstId, taskDefKey);
|
||||
|
||||
int attempts = 0;
|
||||
while (attempts < 10) {
|
||||
long count = taskService.createTaskQuery()
|
||||
.processInstanceId(procInstId)
|
||||
.taskDefinitionKey(taskDefKey)
|
||||
.count();
|
||||
if (count > 0) return;
|
||||
|
||||
log.debug("检查UserTask是否存在, 流程实例ID: {}, 任务定义Key: {}, 当前尝试次数: {}, 任务数量: {}",
|
||||
procInstId, taskDefKey, attempts, count);
|
||||
|
||||
if (count > 0) {
|
||||
log.info("UserTask已创建, 流程实例ID: {}, 任务定义Key: {}, 尝试次数: {}", procInstId, taskDefKey, attempts);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(200); // 200ms
|
||||
attempts++;
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("等待UserTask创建时线程被中断, 流程实例ID: {}, 任务定义Key: {}", procInstId, taskDefKey);
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
log.warn("等待UserTask创建超时, 流程实例ID: {}, 任务定义Key: {}, 最大尝试次数: {}", procInstId, taskDefKey, attempts);
|
||||
}
|
||||
}
|
||||
@@ -59,9 +59,13 @@ public class ProcessService {
|
||||
|
||||
// 部署流程(前端传入Flowable标准JSON)
|
||||
public Deployment deploy(ProcessDefinitionDTO processDTO) throws Exception {
|
||||
log.info("开始部署流程定义");
|
||||
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
|
||||
log.info("BPMN模型转换完成");
|
||||
|
||||
// 检查BPMN模型是否有效
|
||||
if (bpmnModel.getProcesses().isEmpty()) {
|
||||
log.error("无效的BPMN模型:未找到任何流程定义");
|
||||
throw new RuntimeException("无效的BPMN模型:未找到任何流程定义");
|
||||
}
|
||||
|
||||
@@ -73,13 +77,17 @@ public class ProcessService {
|
||||
for (ValidationError error : validationErrors) {
|
||||
errorMsg.append("\n - ").append(error.toString());
|
||||
}
|
||||
log.error("BPMN模型验证失败: {}", errorMsg.toString());
|
||||
throw new RuntimeException(errorMsg.toString());
|
||||
}
|
||||
|
||||
return repositoryService.createDeployment()
|
||||
Deployment deployment = repositoryService.createDeployment()
|
||||
.addBpmnModel("industrial_process.bpmn", bpmnModel)
|
||||
.name("工业并行部署流程")
|
||||
.deploy();
|
||||
|
||||
log.info("流程部署成功, 部署ID: {}, 部署名称: {}", deployment.getId(), deployment.getName());
|
||||
return deployment;
|
||||
}
|
||||
|
||||
public List<Map<String, Object>> listAllDeployments() {
|
||||
@@ -322,8 +330,15 @@ public class ProcessService {
|
||||
if (variables == null) {
|
||||
variables = Collections.emptyMap();
|
||||
}
|
||||
|
||||
log.info("根据流程定义ID启动流程实例, 流程定义ID: {}, 变量数量: {}", processDefinitionId, variables.size());
|
||||
|
||||
return runtimeService.startProcessInstanceById(processDefinitionId, variables);
|
||||
ProcessInstance instance = runtimeService.startProcessInstanceById(processDefinitionId, variables);
|
||||
|
||||
log.info("流程实例启动成功, 实例ID: {}, 流程定义ID: {}, 业务Key: {}",
|
||||
instance.getId(), instance.getProcessDefinitionId(), instance.getBusinessKey());
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
// 获取BPMN模型用于调试和验证
|
||||
@@ -504,37 +519,53 @@ public class ProcessService {
|
||||
}
|
||||
|
||||
public void continueServiceTask(@RequestBody CompleteTaskReq req) {
|
||||
log.info("开始继续服务任务处理, 请求参数: {}", req);
|
||||
|
||||
String taskDefKey;
|
||||
|
||||
// 根据类型确定真正的 taskDefinitionKey
|
||||
if (FlowElementTypeEnums.fromString(req.getFlowelementType()).equals(FlowElementTypeEnums.SERVICETASK)) {
|
||||
// 如果是 ServiceTask 前置等待节点
|
||||
taskDefKey = FlowNodeIdUtils.generateWaitUserTaskId(req.getTaskDefinitionKey());
|
||||
log.info("识别为ServiceTask前置等待节点, 原始任务定义Key: {}, 转换后任务定义Key: {}", req.getTaskDefinitionKey(), taskDefKey);
|
||||
} else {
|
||||
// 普通 UserTask
|
||||
taskDefKey = req.getTaskDefinitionKey();
|
||||
log.info("识别为普通UserTask, 任务定义Key: {}", taskDefKey);
|
||||
}
|
||||
|
||||
Task task = taskService.createTaskQuery()
|
||||
.processInstanceId(req.getProcessInstanceId())
|
||||
.taskDefinitionKey(taskDefKey)
|
||||
.singleResult();
|
||||
|
||||
log.info("任务查询完成, 流程实例ID: {}, 任务定义Key: {}, 查询结果为空: {}", req.getProcessInstanceId(), taskDefKey, task == null);
|
||||
|
||||
if (task == null) {
|
||||
log.error("找不到任务!流程实例ID: {}, taskDefinitionKey={}", req.getProcessInstanceId(), taskDefKey);
|
||||
throw new RuntimeException("找不到任务! taskDefinitionKey=" + taskDefKey);
|
||||
}
|
||||
|
||||
log.info("准备完成任务, 任务ID: {}, 变量数量: {}", task.getId(),
|
||||
req.getVariables() != null ? req.getVariables().size() : 0);
|
||||
|
||||
// 完成任务
|
||||
if (req.getVariables() != null) {
|
||||
taskService.complete(task.getId(), req.getVariables());
|
||||
log.info("任务完成(带变量), 任务ID: {}", task.getId());
|
||||
} else {
|
||||
taskService.complete(task.getId());
|
||||
log.info("任务完成(无变量), 任务ID: {}", task.getId());
|
||||
}
|
||||
|
||||
log.info("服务任务处理完成, 任务ID: {}", task.getId());
|
||||
}
|
||||
|
||||
public void asyncCallback(AsyncCallbackRequest request) {
|
||||
log.info("接收到异步回调请求, 请求内容: {}", request);
|
||||
// 发送信号唤醒流程实例中等待的节点
|
||||
universalDelegate.signalByTaskId(request);
|
||||
log.info("异步回调处理转发完成, 任务ID: {}", request.getAsyncTaskId());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -545,25 +576,37 @@ public class ProcessService {
|
||||
* @param newVariables 新的变量参数
|
||||
*/
|
||||
public void retryToNode(String procInstId, String targetNodeId, Map<String, Object> newVariables) {
|
||||
log.info("开始重试节点操作, 流程实例ID: {}, 目标节点ID: {}, 新变量数量: {}",
|
||||
procInstId, targetNodeId, newVariables != null ? newVariables.size() : 0);
|
||||
|
||||
// 1. 更新流程变量(用户新输入的参数)
|
||||
if (newVariables != null && !newVariables.isEmpty()) {
|
||||
log.info("更新流程变量, 变量数量: {}", newVariables.size());
|
||||
runtimeService.setVariables(procInstId, newVariables);
|
||||
}
|
||||
|
||||
// 2. 将目标节点ID存为变量(供 ExecutionListener 使用)
|
||||
log.info("设置重试目标节点变量, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
|
||||
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_TARGET_NODE_ID, targetNodeId);
|
||||
|
||||
// 3. 查询并完成 retryInputTask
|
||||
log.info("查询重试输入任务, 流程实例ID: {}, 重试任务ID: {}", procInstId, FlowNodeIdUtils.getRetryTaskId());
|
||||
Task retryTask = taskService.createTaskQuery()
|
||||
.processInstanceId(procInstId)
|
||||
.taskDefinitionKey(FlowNodeIdUtils.getRetryTaskId())
|
||||
.singleResult();
|
||||
|
||||
log.info("重试任务查询结果, 任务对象为空: {}", retryTask == null);
|
||||
|
||||
if (retryTask == null) {
|
||||
log.error("未找到重试任务,请确认流程处于重试状态, 流程实例ID: {}", procInstId);
|
||||
throw new IllegalStateException("未找到重试任务,请确认流程处于重试状态");
|
||||
}
|
||||
|
||||
log.info("准备完成重试中转任务, 触发ExecutionListener, 任务ID: {}", retryTask.getId());
|
||||
|
||||
// 完成重试中转任务,触发 ExecutionListener
|
||||
taskService.complete(retryTask.getId());
|
||||
log.info("重试中转任务完成, 任务ID: {}", retryTask.getId());
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,8 @@
|
||||
"extensionElements": {
|
||||
"executeConfig": {
|
||||
"executeType": "HPC",
|
||||
"asyncCallback": true
|
||||
"asyncCallback": true,
|
||||
"waitUser":true
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user