This commit is contained in:
2025-11-28 14:12:22 +08:00
14 changed files with 405 additions and 55 deletions

View File

@@ -35,7 +35,7 @@ spring:
nacos:
discovery:
server-addr: 192.168.65.161:8848
group: LOCAL_GROUP
group: DAI_GROUP
enabled: true
servlet:
multipart:

View File

@@ -18,4 +18,4 @@ public class FlowableApplication {
SpringApplication.run(FlowableApplication.class, args);
}
}
}

View File

@@ -5,4 +5,28 @@ public interface FlowableConfig {
* 前端流程节点自定义执行参数key
*/
String EXECUTECONFIG = "executeConfig";
}
/*
* 重试相关变量名
*/
String RETRY_TARGET_NODE_ID = "_retryTargetNodeId";
String RETRY_ORIGIN_NODE_ID = "_retryOriginNodeId";
String RETRY_ERROR_MESSAGE = "_retryErrorMessage";
/*
* 重试任务ID
*/
String RETRY_TASK_ID = "genericRetryTask";
/*
* 网关前缀
*/
String JOIN_GATEWAY_PREFIX = "join_gw_";
String SPLIT_GATEWAY_PREFIX = "split_gw_";
/*
* 任务后缀
*/
String ASYNC_TASK_SUFFIX = "_wait";
String WAIT_USER_SUFFIX = "_waitUser";
}

View File

@@ -6,6 +6,7 @@ import com.sdm.flowable.delegate.handler.HpcHandler;
import com.sdm.flowable.dto.ProcessDefinitionDTO;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.RetryRequest;
import com.sdm.flowable.dto.resp.ProcessInstanceResp;
import com.sdm.flowable.process.ProcessService;
import com.sdm.flowable.service.IProcessNodeParamService;
@@ -106,7 +107,7 @@ public class ProcessController {
/**
* 获取部署的流程定义元数据信息
* 1、查询所有流程定义所有版本
* 2、查询所有流程的最新版本
* 2、查询所有流程的"最新版本"
* 3、查询某个流程的所有版本
* 4、查询某个流程的最新版本
* 5、查询版本从大到小
@@ -253,4 +254,20 @@ public class ProcessController {
// 发送信号唤醒流程实例中等待的节点
processService.asyncCallback(request);
}
/**
* 用户点击"重试"按钮传入目标节点ID
*
* @param request 重试请求参数包括流程实例ID、目标节点ID和新变量
* @return 重试结果
*/
@PostMapping("/retryToNode")
public SdmResponse retryToNode(@RequestBody RetryRequest request) {
try {
processService.retryToNode(request.getProcInstId(), request.getTargetNodeId(), request.getVariables());
return SdmResponse.success("重试任务已提交");
} catch (Exception e) {
return SdmResponse.failed("重试失败: " + e.getMessage());
}
}
}

View File

@@ -8,13 +8,16 @@ import com.sdm.flowable.delegate.handler.ExecutionHandler;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.service.IAsyncTaskRecordService;
import com.sdm.flowable.service.IProcessNodeParamService;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.engine.runtime.Execution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Map;
/**
@@ -34,6 +37,9 @@ public class UniversalDelegate implements JavaDelegate {
@Autowired
private Map<String, ExecutionHandler> handlerMap; // 执行器映射
@Autowired
private RuntimeService runtimeService;
@Override
public void execute(DelegateExecution execution) {
@@ -73,11 +79,39 @@ public class UniversalDelegate implements JavaDelegate {
// 对于没有配置 executeConfig 的节点(如 userTask直接完成任务
log.info("节点 {} 没有执行配置,直接完成任务", nodeName);
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} catch (Exception e) {
// 处理失败情况 - 跳转到重试任务
handleFailure(execution, e);
}
}
/**
* 处理任务执行失败的情况
* @param execution 当前执行上下文
* @param e 异常信息
*/
private void handleFailure(DelegateExecution execution, Exception e) {
String procInstId = execution.getProcessInstanceId();
String failedNodeId = execution.getCurrentActivityId();
log.error("节点执行失败节点ID: {}, 错误信息: {}", failedNodeId, e.getMessage(), e);
// 记录失败节点(供后续重试用)
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_ORIGIN_NODE_ID, failedNodeId);
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_ERROR_MESSAGE, e.getMessage());
// 跳转到通用重试任务
runtimeService.createChangeActivityStateBuilder()
.processInstanceId(procInstId)
.moveActivityIdsToSingleActivityId(
Collections.singletonList(failedNodeId),
FlowNodeIdUtils.getRetryTaskId()
)
.changeState();
// 不抛出异常,让流程继续
}
/**
* 外部系统回调接口,用于唤醒等待的流程实例
* @param processInstanceId 流程实例ID

View File

@@ -0,0 +1,28 @@
package com.sdm.flowable.dto.req;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
/**
* 重试请求参数
*/
@Data
public class RetryRequest implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 流程实例ID
*/
String procInstId;
/**
* 目标节点ID
*/
private String targetNodeId;
/**
* 重试参数
*/
private Map<String, Object> variables;
}

View File

@@ -0,0 +1,104 @@
package com.sdm.flowable.listener;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.UserTask;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ExecutionListener;
import org.flowable.task.api.Task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
/**
* 重试跳转监听器,在 retryInputTask 完成时执行跳转逻辑
*/
@Slf4j
@Component("retryRedirectListener")
public class RetryRedirectListener implements ExecutionListener {
@Autowired
private RuntimeService runtimeService;
@Autowired
private RepositoryService repositoryService;
@Autowired
private TaskService taskService;
@Override
public void notify(DelegateExecution execution) {
String procInstId = execution.getProcessInstanceId();
// 1. 获取目标节点ID由重试接口设置
String targetNodeId = (String) runtimeService.getVariable(procInstId, FlowableConfig.RETRY_TARGET_NODE_ID);
if (targetNodeId == null) {
throw new IllegalStateException("未指定重试目标节点");
}
// 2. 判断目标节点类型
BpmnModel bpmnModel = repositoryService.getBpmnModel(execution.getProcessDefinitionId());
FlowElement targetElement = bpmnModel.getMainProcess().getFlowElement(targetNodeId);
if (targetElement == null) {
throw new IllegalArgumentException("目标节点不存在: " + targetNodeId);
}
// 3. 跳转到目标节点
runtimeService.createChangeActivityStateBuilder()
.processInstanceId(procInstId)
.moveActivityIdsToSingleActivityId(
Collections.singletonList(FlowNodeIdUtils.getRetryTaskId()),
targetNodeId
)
.changeState();
// 4. 如果目标是 UserTask自动完成它模拟"无人值守"
if (targetElement instanceof UserTask) {
// 等待任务创建(异步操作可能有延迟)
awaitUserTaskCreated(procInstId, targetNodeId);
// 查询刚创建的 UserTask
Task userTask = taskService.createTaskQuery()
.processInstanceId(procInstId)
.taskDefinitionKey(targetNodeId)
.orderByTaskCreateTime().desc() // 取最新
.singleResult();
if (userTask != null) {
// 自动完成 UserTask不传变量或传空
taskService.complete(userTask.getId());
log.info("自动完成 UserTask: {}", targetNodeId);
}
}
// 如果是 ServiceTask引擎会自动执行无需干预
log.info("已完成跳转到目标节点: {}", targetNodeId);
}
// 辅助方法:等待 UserTask 创建(简单轮询)
private void awaitUserTaskCreated(String procInstId, String taskDefKey) {
int attempts = 0;
while (attempts < 10) {
long count = taskService.createTaskQuery()
.processInstanceId(procInstId)
.taskDefinitionKey(taskDefKey)
.count();
if (count > 0) return;
try {
Thread.sleep(200); // 200ms
attempts++;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}

View File

@@ -1,9 +1,11 @@
package com.sdm.flowable.process;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.delegate.UniversalDelegate;
import com.sdm.flowable.dto.ProcessDefinitionDTO;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.dto.req.RetryRequest;
import com.sdm.flowable.enums.FlowElementTypeEnums;
import com.sdm.flowable.util.Dto2BpmnConverter;
import com.sdm.flowable.constants.FlowableConfig;
@@ -529,8 +531,39 @@ public class ProcessService {
taskService.complete(task.getId());
}
}
public void asyncCallback(AsyncCallbackRequest request) {
// 发送信号唤醒流程实例中等待的节点
universalDelegate.signalByTaskId(request);
}
/**
* 用户点击"重试"按钮传入目标节点ID
*
* @param procInstId 流程实例ID
* @param targetNodeId 目标节点ID
* @param newVariables 新的变量参数
*/
public void retryToNode(String procInstId, String targetNodeId, Map<String, Object> newVariables) {
// 1. 更新流程变量(用户新输入的参数)
if (newVariables != null && !newVariables.isEmpty()) {
runtimeService.setVariables(procInstId, newVariables);
}
// 2. 将目标节点ID存为变量供 ExecutionListener 使用)
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_TARGET_NODE_ID, targetNodeId);
// 3. 查询并完成 retryInputTask
Task retryTask = taskService.createTaskQuery()
.processInstanceId(procInstId)
.taskDefinitionKey(FlowNodeIdUtils.getRetryTaskId())
.singleResult();
if (retryTask == null) {
throw new IllegalStateException("未找到重试任务,请确认流程处于重试状态");
}
// 完成重试中转任务,触发 ExecutionListener
taskService.complete(retryTask.getId());
}
}

View File

@@ -9,6 +9,7 @@ import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.dto.FlowElementDTO;
import com.sdm.flowable.dto.ProcessDefinitionDTO;
import com.sdm.flowable.enums.FlowElementTypeEnums;
import com.sdm.flowable.util.FlowNodeIdUtils;
import org.flowable.bpmn.model.*;
import org.flowable.bpmn.model.Process;
import org.springframework.beans.factory.annotation.Autowired;
@@ -68,12 +69,34 @@ public class Dto2BpmnConverter {
addRequiredGateways(process, nodeDto, flowDtos, joinGatewayMap, splitGatewayMap);
}
// 添加通用重试任务节点
addRetryTask(process);
// 6. 创建连线
createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap);
return bpmnModel;
}
/**
* 添加通用重试任务节点
*/
private void addRetryTask(Process process) {
// 通用重试中转任务(无 incoming/outgoing
UserTask retryInputTask = new UserTask();
retryInputTask.setId(FlowNodeIdUtils.getRetryTaskId());
retryInputTask.setName("通用重试任务");
// 添加 ExecutionListener
FlowableListener listener = new FlowableListener();
listener.setEvent("end"); // "start"、"end"、"take" 等
listener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
listener.setImplementation("${retryRedirectListener}");
retryInputTask.getExecutionListeners().add(listener);
process.addFlowElement(retryInputTask);
}
/**
* 处理异步任务节点
*/

View File

@@ -1,10 +1,12 @@
package com.sdm.flowable.util;
import com.sdm.flowable.constants.FlowableConfig;
public class FlowNodeIdUtils {
private static final String JOIN_GATEWAY_PREFIX = "join_gw_";
private static final String SPLIT_GATEWAY_PREFIX = "split_gw_";
private static final String ASYNC_TASK_SUFFIX = "_wait";
private static final String WAIT_USER_SUFFIX = "_waitUser";
private static final String JOIN_GATEWAY_PREFIX = FlowableConfig.JOIN_GATEWAY_PREFIX;
private static final String SPLIT_GATEWAY_PREFIX = FlowableConfig.SPLIT_GATEWAY_PREFIX;
private static final String ASYNC_TASK_SUFFIX = FlowableConfig.ASYNC_TASK_SUFFIX;
private static final String WAIT_USER_SUFFIX = FlowableConfig.WAIT_USER_SUFFIX;
// ==================== 网关 ====================
@@ -57,4 +59,10 @@ public class FlowNodeIdUtils {
}
return waitUserTaskId.substring(0, waitUserTaskId.length() - WAIT_USER_SUFFIX.length());
}
}
// ==================== 重试任务 ====================
public static String getRetryTaskId() {
return FlowableConfig.RETRY_TASK_ID;
}
}

View File

@@ -33,7 +33,7 @@ public class RequestResponseLoggingFilter implements GlobalFilter, Ordered {
public static final String TRACE_ID_HEADER = "X-Trace-Id";
/** 最大记录的响应体长度(防止大 JSON */
private static final int MAX_BODY_LENGTH = 10 * 1024;
private static final int MAX_BODY_LENGTH = 100 * 1024;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
@@ -123,23 +123,25 @@ public class RequestResponseLoggingFilter implements GlobalFilter, Ordered {
if (!(body instanceof Flux)) {
return super.writeWith(body);
}
Flux<DataBuffer> fluxBody = (Flux<DataBuffer>) body;
return DataBufferUtils.join(fluxBody)
.flatMap(joinedBuffer -> {
byte[] content = null;
try {
int length = Math.min(
joinedBuffer.readableByteCount(),
MAX_BODY_LENGTH
);
content = new byte[length];
joinedBuffer.read(content);
// 先复制完整内容(不会影响 readerIndex
byte[] fullBytes = new byte[joinedBuffer.readableByteCount()];
joinedBuffer.read(fullBytes);
logResponse(exchange, traceId, startTime, content);
// 必须返回完整响应内容给客户端
DataBuffer newBuffer = bufferFactory.wrap(
joinedBuffer.asByteBuffer()
);
// 日志只截断读取
int logLength = Math.min(fullBytes.length, MAX_BODY_LENGTH);
byte[] logBytes = new byte[logLength];
System.arraycopy(fullBytes, 0, logBytes, 0, logLength);
logResponse(exchange, traceId, startTime, logBytes);
// 用完整数据返回客户端
DataBuffer newBuffer = bufferFactory.wrap(fullBytes);
return super.writeWith(Mono.just(newBuffer));
} finally {
DataBufferUtils.release(joinedBuffer);

View File

@@ -1,14 +1,12 @@
package com.sdm.project.controller;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.data.UploadFilesReq;
import com.sdm.project.model.req.YA.GetModelNodeInfoReq;
import com.sdm.project.model.req.YA.SaveModelNodeInfoReq;
import com.sdm.project.model.req.YA.SyncCidProjectReq;
import com.sdm.project.model.req.YA.SyncCidTaskReq;
import com.sdm.project.service.INodeService;
import com.sdm.project.service.ITaskService;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.project.model.req.YA.*;
import com.sdm.project.model.resp.YA.BosimSaveNodeInfoRsp;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -16,26 +14,19 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/dataManager/tree/node")
@Tag(name = "宜安项目数据归档", description = "宜安项目模型数据关了")
public class YAModelController {
@Resource
private INodeService nodeService;
@Autowired
private IDataFeignClient dataFeignClient;
@Resource
private ITaskService taskService;
/**
* 创建文件夹
* 保存仿真模型数据
*
* @param req
* @return
@@ -53,9 +44,8 @@ public class YAModelController {
)
)
)
public SdmResponse saveModelNodeInfo(@RequestBody @Validated SaveModelNodeInfoReq req)
public BosimSaveNodeInfoRsp saveModelNodeInfo(SaveModelNodeInfoReq req)
{
UploadFilesReq fileReq = new UploadFilesReq();
fileReq.setFileName(req.getName());
fileReq.setProjectId(req.getProject());
@@ -63,10 +53,95 @@ public class YAModelController {
fileReq.setFile(req.getFile());
fileReq.setUuid(req.getScenario());
fileReq.setAnalysisDirectionId(req.getScenario());
return null;
SdmResponse uploadRespond = dataFeignClient.uploadFiles(fileReq);
BosimSaveNodeInfoRsp rsp = new BosimSaveNodeInfoRsp();
if(uploadRespond.isSuccess())
{
JSONObject data = (JSONObject) uploadRespond.getData();
rsp.setCode("200");
rsp.setMessage("success");
long fileId = data.getLong("fileId");
rsp.getWorkRequest().add(String.valueOf(fileId));
}
return rsp;
}
/**
* 保存仿真报告
*
* @param req
* @return
*/
@PostMapping(value = "/SaveReportNodeInfo", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
@Operation(
summary = "上传仿真报告",
description = "仿真报告归档,支持同时上传文件和附加参数",
requestBody = @io.swagger.v3.oas.annotations.parameters.RequestBody(
description = "仿真报告上传请求",
required = true,
content = @Content(
mediaType = MediaType.MULTIPART_FORM_DATA_VALUE,
schema = @Schema(implementation = SaveReportNodeInfoReq.class)
)
)
)
public BosimSaveNodeInfoRsp saveReportNodeInfo(SaveReportNodeInfoReq req)
{
UploadFilesReq fileReq = new UploadFilesReq();
fileReq.setFileName(req.getName());
fileReq.setProjectId(req.getProject());
fileReq.setFileType(2);
fileReq.setFile(req.getFile());
fileReq.setUuid(req.getScenario());
fileReq.setAnalysisDirectionId(req.getScenario());
SdmResponse uploadRespond = dataFeignClient.uploadFiles(fileReq);
BosimSaveNodeInfoRsp rsp = new BosimSaveNodeInfoRsp();
if(uploadRespond.isSuccess())
{
JSONObject data = (JSONObject) uploadRespond.getData();
rsp.setCode("200");
rsp.setMessage("success");
long fileId = data.getLong("fileId");
rsp.getWorkRequest().add(String.valueOf(fileId));
}
return rsp;
}
/**
* 保存仿真关键结果
* @param req
* @return
*/
@PostMapping(value = "/SaveKeyResultNodeInfo", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
@Operation(
summary = "保存仿真关键结果",
description = "仿真关键结果归档,支持同时上传文件和附加参数",
requestBody = @io.swagger.v3.oas.annotations.parameters.RequestBody(
description = "仿真关键结果上传请求",
required = true,
content = @Content(
mediaType = MediaType.MULTIPART_FORM_DATA_VALUE,
schema = @Schema(implementation = SaveReportNodeInfoReq.class)
)
)
)
public BosimSaveNodeInfoRsp saveKeyResultNodeInfo(SaveKeyResultNodeInfoReq req)
{
}
@PostMapping("")
public BosimSaveNodeInfoRsp deleteModelNodeInfo(DeleteModelNodeInfoReq req)
{
}
@PostMapping("GetModelNodeInfoByIdAndType")
public void getModelNodeInfo(@RequestBody @Validated GetModelNodeInfoReq req)
{
@@ -79,16 +154,16 @@ public class YAModelController {
}
@PostMapping("syncProject")
public void syncCidProject(@RequestBody @Validated SyncCidProjectReq req)
@PostMapping("SyncProject")
public void syncCidProject()
{
nodeService.syncProject(req);
}
@PostMapping("syncCidTask")
public void syncCidTask(@RequestBody @Validated SyncCidTaskReq req)
@PostMapping("SyncCidTask")
public void syncCidTask()
{
taskService.syncCidTask(req);
}
}

View File

@@ -1,17 +1,19 @@
package com.sdm.project.model.resp.YA;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
public class BosimSaveModelNodeRsp {
@Data
public class BosimSaveNodeInfoRsp {
@Schema(description = "返回码")
private String code;
private String code ;
@Schema(description = "返回信息")
private String message;
private String message = "";
private List<String> workRequest = new ArrayList<>();
}

View File

@@ -36,7 +36,7 @@ spring:
discovery:
server-addr: 192.168.65.161:8848
enabled: true
group: LOCAL_GROUP
group: DAI_GROUP
# username: nacos
# password: ENC(+QKYnI6gAYu1SbLaZQTkZA==)
data: