fix:流程结束更新算列状态
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.sdm.flowable.listener;
|
||||
|
||||
import com.sdm.common.common.ThreadLocalContext;
|
||||
import com.sdm.common.feign.inter.project.ISimulationRunFeignClient;
|
||||
import com.sdm.flowable.enums.ProcessInstanceStateEnum;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -7,6 +8,7 @@ import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
|
||||
import org.flowable.engine.RuntimeService;
|
||||
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
|
||||
import org.flowable.job.api.Job;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -14,6 +16,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@@ -38,6 +41,9 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
@Autowired
|
||||
private ISimulationRunFeignClient simulationRunFeignClient;
|
||||
|
||||
@Autowired
|
||||
private RuntimeService runtimeService;
|
||||
|
||||
@Override
|
||||
public Set<FlowableEngineEventType> getTypes() {
|
||||
return new HashSet<>(Arrays.asList(
|
||||
@@ -84,8 +90,11 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
*/
|
||||
private void handleProcessCompleted(FlowableEngineEntityEvent event) {
|
||||
String processInstanceId = event.getProcessInstanceId();
|
||||
log.info("流程正常完成: processInstanceId={}", processInstanceId);
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.COMPLETED.getCode());
|
||||
ExecutionEntity execution = (ExecutionEntity) event.getEntity();
|
||||
Long userId = (Long) execution.getVariable("userId");
|
||||
Long tenantId = (Long) execution.getVariable("tenantId");
|
||||
log.info("流程正常完成: processInstanceId={}, userId={}, tenantId={}", processInstanceId, userId, tenantId);
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.COMPLETED.getCode(), userId, tenantId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -95,9 +104,11 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
private void handleProcessCancelled(FlowableEngineEntityEvent event) {
|
||||
String processInstanceId = event.getProcessInstanceId();
|
||||
ExecutionEntity execution = (ExecutionEntity) event.getEntity();
|
||||
log.info("流程被取消: processInstanceId={}, deleteReason={}",
|
||||
processInstanceId, execution.getDeleteReason());
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.CANCELLED.getCode());
|
||||
Long userId = (Long) execution.getVariable("userId");
|
||||
Long tenantId = (Long) execution.getVariable("tenantId");
|
||||
log.info("流程被取消: processInstanceId={}, userId={}, tenantId={}, deleteReason={}",
|
||||
processInstanceId, userId, tenantId, execution.getDeleteReason());
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.CANCELLED.getCode(), userId, tenantId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -115,10 +126,12 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
if (execution.isProcessInstanceType()) {
|
||||
String processInstanceId = execution.getProcessInstanceId();
|
||||
Integer status = (eventType == FlowableEngineEventType.ENTITY_SUSPENDED) ? ProcessInstanceStateEnum.SUSPENDED.getCode() : ProcessInstanceStateEnum.RUNNING.getCode();
|
||||
Long userId = (Long) execution.getVariable("userId");
|
||||
Long tenantId = (Long) execution.getVariable("tenantId");
|
||||
|
||||
log.info("流程实例{}状态变更: processInstanceId={}",
|
||||
status.equals("SUSPENDED") ? "挂起" : "激活", processInstanceId);
|
||||
doUpdate(processInstanceId, status);
|
||||
log.info("流程实例{}状态变更: processInstanceId={}, userId={}, tenantId={}",
|
||||
(eventType == FlowableEngineEventType.ENTITY_SUSPENDED) ? "挂起" : "激活", processInstanceId, userId, tenantId);
|
||||
doUpdate(processInstanceId, status, userId, tenantId);
|
||||
} else {
|
||||
log.debug("忽略非流程实例级别的挂起/激活事件: executionId={}", execution.getId());
|
||||
}
|
||||
@@ -141,10 +154,19 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
String processInstanceId = job.getProcessInstanceId();
|
||||
String exceptionMessage = job.getExceptionMessage();
|
||||
|
||||
log.error("❌ 作业进入死信队列,流程ERROR: processInstanceId={}, jobId={}, exception={}",
|
||||
processInstanceId, job.getId(), exceptionMessage);
|
||||
// 对于Job事件,需要通过RuntimeService查询流程变量
|
||||
Long userId = null;
|
||||
Long tenantId = null;
|
||||
if (processInstanceId != null) {
|
||||
Map<String, Object> variables = runtimeService.getVariables(processInstanceId);
|
||||
userId = (Long) variables.get("userId");
|
||||
tenantId = (Long) variables.get("tenantId");
|
||||
}
|
||||
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.ERROR.getCode());
|
||||
log.error("❌ 作业进入死信队列,流程ERROR: processInstanceId={}, userId={}, tenantId={}, jobId={}, exception={}",
|
||||
processInstanceId, userId, tenantId, job.getId(), exceptionMessage);
|
||||
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.ERROR.getCode(), userId, tenantId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,17 +174,30 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
* 真正的更新数据库逻辑
|
||||
* 通过Feign调用project服务更新算例状态
|
||||
*/
|
||||
private void doUpdate(String processInstanceId, Integer statusCode) {
|
||||
private void doUpdate(String processInstanceId, Integer statusCode, Long userId, Long tenantId) {
|
||||
if (processInstanceId == null) {
|
||||
log.warn("流程实例ID为空,跳过状态更新");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(">>> 更新算例状态 [{}] -> {}", processInstanceId, statusCode);
|
||||
|
||||
// 设置线程上下文,确保Feign调用能正确传递用户和租户信息
|
||||
try {
|
||||
if (userId != null) {
|
||||
ThreadLocalContext.setUserId(userId);
|
||||
}
|
||||
if (tenantId != null) {
|
||||
ThreadLocalContext.setTenantId(tenantId);
|
||||
}
|
||||
|
||||
simulationRunFeignClient.updateStatusByProcessInstanceId(processInstanceId, statusCode);
|
||||
} catch (Exception e) {
|
||||
log.error("更新算例状态失败: processInstanceId={}, status={}", processInstanceId, statusCode, e);
|
||||
log.error("更新算例状态失败: processInstanceId={}, userId={}, tenantId={}, status={}",
|
||||
processInstanceId, userId, tenantId, statusCode, e);
|
||||
} finally {
|
||||
// 清理线程上下文,防止线程池污染
|
||||
ThreadLocalContext.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,4 +219,4 @@ public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
public String getOnTransaction() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,7 +180,7 @@ public class Dto2BpmnConverter {
|
||||
}
|
||||
|
||||
private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map<String, String> waitUserTaskMap) {
|
||||
// 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask
|
||||
// 用来设置节点是否人工执行: 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask
|
||||
if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType())) {
|
||||
String originalNodeId = nodeDto.getId();
|
||||
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
|
||||
@@ -606,22 +606,6 @@ public class Dto2BpmnConverter {
|
||||
process.addFlowElement(userTask);
|
||||
}
|
||||
|
||||
/**
|
||||
* 为节点添加目录准备监听器
|
||||
* @param flowElement 需要添加监听器的节点
|
||||
*/
|
||||
private void addDirectoryPreparationListener(FlowElement flowElement) {
|
||||
FlowableListener dirPrepareListener = new FlowableListener();
|
||||
dirPrepareListener.setEvent("start");
|
||||
dirPrepareListener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
|
||||
dirPrepareListener.setImplementation("${userTaskDirectoryPreparationListener}");
|
||||
|
||||
if (flowElement instanceof UserTask) {
|
||||
((UserTask) flowElement).getExecutionListeners().add(dirPrepareListener);
|
||||
} else if (flowElement instanceof ServiceTask) {
|
||||
((ServiceTask) flowElement).getExecutionListeners().add(dirPrepareListener);
|
||||
}
|
||||
}
|
||||
|
||||
// 抽取通用的 UserTask 扩展属性设置
|
||||
private void addUserTaskExtensions(UserTask userTask, FlowElementDTO nodeDto) throws JsonProcessingException {
|
||||
@@ -649,6 +633,24 @@ public class Dto2BpmnConverter {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 为节点添加目录准备监听器
|
||||
* @param flowElement 需要添加监听器的节点
|
||||
*/
|
||||
private void addDirectoryPreparationListener(FlowElement flowElement) {
|
||||
FlowableListener dirPrepareListener = new FlowableListener();
|
||||
dirPrepareListener.setEvent("start");
|
||||
dirPrepareListener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
|
||||
dirPrepareListener.setImplementation("${userTaskDirectoryPreparationListener}");
|
||||
|
||||
if (flowElement instanceof UserTask) {
|
||||
((UserTask) flowElement).getExecutionListeners().add(dirPrepareListener);
|
||||
} else if (flowElement instanceof ServiceTask) {
|
||||
((ServiceTask) flowElement).getExecutionListeners().add(dirPrepareListener);
|
||||
}
|
||||
}
|
||||
|
||||
private void createServiceTask(Process process, FlowElementDTO nodeDto, Map<String, String> asyncTaskMap) throws JsonProcessingException {
|
||||
// 服务任务:映射为 Flowable ServiceTask,绑定自定义执行器
|
||||
ServiceTask serviceTask = new ServiceTask();
|
||||
|
||||
@@ -142,6 +142,7 @@ public class FlowNodeIdUtils {
|
||||
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
|
||||
Path localBaseDir = Paths.get(simulationBaseDir).toAbsolutePath().normalize();
|
||||
Path fullLocalPath = localBaseDir.resolve(objectKey).normalize();
|
||||
log.info("开始准备本地目录: {}", fullLocalPath);
|
||||
|
||||
// 安全校验:防止路径穿越
|
||||
if (!fullLocalPath.startsWith(localBaseDir)) {
|
||||
|
||||
Reference in New Issue
Block a user