This commit is contained in:
2025-11-28 15:00:43 +08:00
12 changed files with 155 additions and 17 deletions

View File

@@ -1,9 +1,7 @@
package com.sdm.data.model.req;
package com.sdm.common.entity.req.data;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.sdm.common.entity.enums.FileBizTypeEnum;
import com.sdm.common.entity.req.data.BaseReq;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@@ -0,0 +1,31 @@
package com.sdm.common.feign.impl.data;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.data.GetSimulationTaskFileReq;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataAnalysisFeignClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class DataAnalysisFeignClientImpl implements IDataAnalysisFeignClient {
@Autowired
IDataAnalysisFeignClient dataAnalysisFeignClient;
@Override
public SdmResponse getSimulationTaskFile(GetSimulationTaskFileReq getSimulationTaskFileReq) {
SdmResponse response;
try {
response = dataAnalysisFeignClient.getSimulationTaskFile(getSimulationTaskFileReq);
log.info("查询文件夹列表响应:"+ response);
return response;
} catch (Exception e) {
log.error("查询文件夹列表失败", e);
return SdmResponse.failed("查询文件夹列表失败");
}
}
}

View File

@@ -0,0 +1,13 @@
package com.sdm.common.feign.inter.data;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.data.GetSimulationTaskFileReq;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(name = "data",contextId = "dataAnalysisFeignClient")
public interface IDataAnalysisFeignClient {
@PostMapping("/getSimulationTaskFile")
public SdmResponse getSimulationTaskFile(@RequestBody GetSimulationTaskFileReq getSimulationTaskFileReq);
}

View File

@@ -19,7 +19,7 @@ import org.springframework.web.multipart.MultipartFile;
import java.util.List;
@FeignClient(name = "data")
@FeignClient(name = "data",contextId = "dataFeignClient")
public interface IDataFeignClient {
@GetMapping("/data/listDir")
SdmResponse<List<FileMetadataInfoResp>> listDir( @RequestParam(value = "dirType") Integer dirType,@RequestParam(value = "parentDirId", required = false) Long parentDirId);

View File

@@ -1,7 +1,8 @@
package com.sdm.data.controller;
import com.sdm.common.common.SdmResponse;
import com.sdm.data.model.req.GetSimulationTaskFileReq;
import com.sdm.common.entity.req.data.GetSimulationTaskFileReq;
import com.sdm.common.feign.inter.data.IDataAnalysisFeignClient;
import com.sdm.data.service.IDataAnalysisService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@@ -18,7 +19,7 @@ import java.util.List;
@RestController
@RequestMapping("/dataAnalysis")
@Tag(name = "数据分析", description = "结果曲线、结果云图")
public class DataAnalysisController {
public class DataAnalysisController implements IDataAnalysisFeignClient {
@Autowired
private IDataAnalysisService dataAnalysisService;

View File

@@ -1,7 +1,7 @@
package com.sdm.data.service;
import com.sdm.common.common.SdmResponse;
import com.sdm.data.model.req.GetSimulationTaskFileReq;
import com.sdm.common.entity.req.data.GetSimulationTaskFileReq;
import java.util.List;

View File

@@ -3,13 +3,12 @@ package com.sdm.data.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.github.pagehelper.PageInfo;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.enums.FileIsLastEnum;
import com.sdm.common.entity.resp.PageDataResp;
import com.sdm.common.utils.FileSizeUtils;
import com.sdm.common.utils.PageUtils;
import com.sdm.data.model.entity.FileMetadataInfo;
import com.sdm.data.model.entity.FileStorage;
import com.sdm.data.model.req.GetSimulationTaskFileReq;
import com.sdm.common.entity.req.data.GetSimulationTaskFileReq;
import com.sdm.data.model.req.QueryBigFileReq;
import com.sdm.data.model.resp.SimulationTaskResultCurveResp;
import com.sdm.data.service.DataStorageAnalysis;

View File

@@ -35,7 +35,7 @@ spring:
nacos:
discovery:
server-addr: 192.168.65.161:8848
group: DAI_GROUP
group: LOCAL_GROUP
enabled: true
servlet:
multipart:

View File

@@ -49,11 +49,12 @@ public class UniversalDelegate implements JavaDelegate {
String nodeId = execution.getCurrentActivityId();
String nodeName = execution.getCurrentFlowElement().getName();
log.info("开始执行节点, 流程实例ID: {}, 节点ID: {}, 节点名称: {}", procInstId, nodeId, nodeName);
// 2. 读取输入参数
Map<String, Object> params = processNodeParamService.getParam(procInstId, nodeId);
log.info("==== 节点执行日志 ====\n流程实例ID{}\n节点ID{}\n节点名称{}\n输入参数{}\n====================",
procInstId, nodeId, nodeName, params);
log.info("节点执行参数, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params);
// 检查是否有扩展元素配置
if (execution.getCurrentFlowElement().getExtensionElements() != null &&
@@ -64,23 +65,30 @@ public class UniversalDelegate implements JavaDelegate {
.getExtensionElements()
.get(FlowableConfig.EXECUTECONFIG).get(0).getElementText();
log.info("节点扩展配置, 流程实例ID: {}, 节点ID: {}, 扩展配置: {}", procInstId, nodeId, extensionElement);
BaseExecuteConfig config =
objectMapper.readValue(extensionElement, BaseExecuteConfig.class);
String executeType = config.getExecuteType();
ExecutionHandler handler = handlerMap.get(executeType);
if (handler == null) {
log.error("不支持的执行方式, 流程实例ID: {}, 节点ID: {}, 执行方式: {}", procInstId, nodeId, executeType);
throw new RuntimeException("不支持的执行方式:" + executeType);
}
log.info("开始执行具体任务处理逻辑, 流程实例ID: {}, 节点ID: {}, 执行方式: {}", procInstId, nodeId, executeType);
// 执行具体的任务处理逻辑
handler.execute(execution, params, config);
log.info("任务处理逻辑执行完成, 流程实例ID: {}, 节点ID: {}, 执行方式: {}", procInstId, nodeId, executeType);
} else {
// 对于没有配置 executeConfig 的节点(如 userTask直接完成任务
log.info("节点 {} 没有执行配置,直接完成任务", nodeName);
}
log.info("节点执行完成, 流程实例ID: {}, 节点ID: {}, 节点名称: {}", procInstId, nodeId, nodeName);
} catch (Exception e) {
// 处理失败情况 - 跳转到重试任务
log.error("节点执行过程中发生异常", e);
handleFailure(execution, e);
}
}
@@ -93,12 +101,15 @@ public class UniversalDelegate implements JavaDelegate {
private void handleFailure(DelegateExecution execution, Exception e) {
String procInstId = execution.getProcessInstanceId();
String failedNodeId = execution.getCurrentActivityId();
String nodeName = execution.getCurrentFlowElement().getName();
log.error("节点执行失败,节点ID: {}, 错误信息: {}", failedNodeId, e.getMessage(), e);
log.error("节点执行失败,流程实例ID: {}, 节点ID: {}, 节点名称: {}, 错误信息: {}", procInstId, failedNodeId, nodeName, e.getMessage(), e);
// 记录失败节点(供后续重试用)
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_ORIGIN_NODE_ID, failedNodeId);
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_ERROR_MESSAGE, e.getMessage());
log.info("准备跳转到重试任务, 流程实例ID: {}, 失败节点ID: {}, 重试任务ID: {}", procInstId, failedNodeId, FlowNodeIdUtils.getRetryTaskId());
// 跳转到通用重试任务
runtimeService.createChangeActivityStateBuilder()
@@ -109,6 +120,7 @@ public class UniversalDelegate implements JavaDelegate {
)
.changeState();
log.info("已完成跳转到重试任务, 流程实例ID: {}, 失败节点ID: {}", procInstId, failedNodeId);
// 不抛出异常,让流程继续
}
@@ -122,7 +134,9 @@ public class UniversalDelegate implements JavaDelegate {
* HPC 回调接口,用于唤醒等待的流程
*/
public void signalByTaskId(AsyncCallbackRequest request) {
log.info("接收到异步回调请求: {}", request);
asyncTaskRecordService.completeAsyncTask(request);
log.info("异步回调处理完成: {}", request.getAsyncTaskId());
}
}

View File

@@ -36,22 +36,37 @@ public class RetryRedirectListener implements ExecutionListener {
@Override
public void notify(DelegateExecution execution) {
String procInstId = execution.getProcessInstanceId();
String currentActivityId = execution.getCurrentActivityId();
log.info("重试跳转监听器开始执行, 流程实例ID: {}, 当前活动ID: {}", procInstId, currentActivityId);
// 1. 获取目标节点ID由重试接口设置
String targetNodeId = (String) runtimeService.getVariable(procInstId, FlowableConfig.RETRY_TARGET_NODE_ID);
log.info("获取重试目标节点ID, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
if (targetNodeId == null) {
log.error("未指定重试目标节点, 流程实例ID: {}", procInstId);
throw new IllegalStateException("未指定重试目标节点");
}
// 2. 判断目标节点类型
log.info("获取BPMN模型, 流程定义ID: {}", execution.getProcessDefinitionId());
BpmnModel bpmnModel = repositoryService.getBpmnModel(execution.getProcessDefinitionId());
FlowElement targetElement = bpmnModel.getMainProcess().getFlowElement(targetNodeId);
log.info("获取目标节点元素, 流程实例ID: {}, 目标节点ID: {}, 目标节点对象为空: {}", procInstId, targetNodeId, targetElement == null);
if (targetElement == null) {
log.error("目标节点不存在, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
throw new IllegalArgumentException("目标节点不存在: " + targetNodeId);
}
log.info("目标节点类型: {}", targetElement.getClass().getSimpleName());
// 3. 跳转到目标节点
log.info("开始跳转到目标节点, 流程实例ID: {}, 源节点ID: {}, 目标节点ID: {}",
procInstId, FlowNodeIdUtils.getRetryTaskId(), targetNodeId);
runtimeService.createChangeActivityStateBuilder()
.processInstanceId(procInstId)
.moveActivityIdsToSingleActivityId(
@@ -59,24 +74,34 @@ public class RetryRedirectListener implements ExecutionListener {
targetNodeId
)
.changeState();
log.info("节点跳转操作完成, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
// 4. 如果目标是 UserTask自动完成它模拟"无人值守"
if (targetElement instanceof UserTask) {
log.info("目标节点是UserTask准备自动完成, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
// 等待任务创建(异步操作可能有延迟)
awaitUserTaskCreated(procInstId, targetNodeId);
// 查询刚创建的 UserTask
log.info("查询刚创建的UserTask, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
Task userTask = taskService.createTaskQuery()
.processInstanceId(procInstId)
.taskDefinitionKey(targetNodeId)
.orderByTaskCreateTime().desc() // 取最新
.singleResult();
log.info("UserTask查询结果, 流程实例ID: {}, 目标节点ID: {}, 任务对象为空: {}", procInstId, targetNodeId, userTask == null);
if (userTask != null) {
// 自动完成 UserTask不传变量或传空
log.info("开始自动完成UserTask, 任务ID: {}, 流程实例ID: {}, 目标节点ID: {}", userTask.getId(), procInstId, targetNodeId);
taskService.complete(userTask.getId());
log.info("自动完成 UserTask: {}", targetNodeId);
}
} else {
log.info("目标节点不是UserTask无需自动完成, 节点类型: {}", targetElement.getClass().getSimpleName());
}
// 如果是 ServiceTask引擎会自动执行无需干预
@@ -85,20 +110,33 @@ public class RetryRedirectListener implements ExecutionListener {
// 辅助方法:等待 UserTask 创建(简单轮询)
private void awaitUserTaskCreated(String procInstId, String taskDefKey) {
log.info("等待UserTask创建, 流程实例ID: {}, 任务定义Key: {}", procInstId, taskDefKey);
int attempts = 0;
while (attempts < 10) {
long count = taskService.createTaskQuery()
.processInstanceId(procInstId)
.taskDefinitionKey(taskDefKey)
.count();
if (count > 0) return;
log.debug("检查UserTask是否存在, 流程实例ID: {}, 任务定义Key: {}, 当前尝试次数: {}, 任务数量: {}",
procInstId, taskDefKey, attempts, count);
if (count > 0) {
log.info("UserTask已创建, 流程实例ID: {}, 任务定义Key: {}, 尝试次数: {}", procInstId, taskDefKey, attempts);
return;
}
try {
Thread.sleep(200); // 200ms
attempts++;
} catch (InterruptedException e) {
log.warn("等待UserTask创建时线程被中断, 流程实例ID: {}, 任务定义Key: {}", procInstId, taskDefKey);
Thread.currentThread().interrupt();
break;
}
}
log.warn("等待UserTask创建超时, 流程实例ID: {}, 任务定义Key: {}, 最大尝试次数: {}", procInstId, taskDefKey, attempts);
}
}

View File

@@ -59,9 +59,13 @@ public class ProcessService {
// 部署流程前端传入Flowable标准JSON
public Deployment deploy(ProcessDefinitionDTO processDTO) throws Exception {
log.info("开始部署流程定义");
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
log.info("BPMN模型转换完成");
// 检查BPMN模型是否有效
if (bpmnModel.getProcesses().isEmpty()) {
log.error("无效的BPMN模型未找到任何流程定义");
throw new RuntimeException("无效的BPMN模型未找到任何流程定义");
}
@@ -73,13 +77,17 @@ public class ProcessService {
for (ValidationError error : validationErrors) {
errorMsg.append("\n - ").append(error.toString());
}
log.error("BPMN模型验证失败: {}", errorMsg.toString());
throw new RuntimeException(errorMsg.toString());
}
return repositoryService.createDeployment()
Deployment deployment = repositoryService.createDeployment()
.addBpmnModel("industrial_process.bpmn", bpmnModel)
.name("工业并行部署流程")
.deploy();
log.info("流程部署成功, 部署ID: {}, 部署名称: {}", deployment.getId(), deployment.getName());
return deployment;
}
public List<Map<String, Object>> listAllDeployments() {
@@ -322,8 +330,15 @@ public class ProcessService {
if (variables == null) {
variables = Collections.emptyMap();
}
log.info("根据流程定义ID启动流程实例, 流程定义ID: {}, 变量数量: {}", processDefinitionId, variables.size());
return runtimeService.startProcessInstanceById(processDefinitionId, variables);
ProcessInstance instance = runtimeService.startProcessInstanceById(processDefinitionId, variables);
log.info("流程实例启动成功, 实例ID: {}, 流程定义ID: {}, 业务Key: {}",
instance.getId(), instance.getProcessDefinitionId(), instance.getBusinessKey());
return instance;
}
// 获取BPMN模型用于调试和验证
@@ -504,37 +519,53 @@ public class ProcessService {
}
public void continueServiceTask(@RequestBody CompleteTaskReq req) {
log.info("开始继续服务任务处理, 请求参数: {}", req);
String taskDefKey;
// 根据类型确定真正的 taskDefinitionKey
if (FlowElementTypeEnums.fromString(req.getFlowelementType()).equals(FlowElementTypeEnums.SERVICETASK)) {
// 如果是 ServiceTask 前置等待节点
taskDefKey = FlowNodeIdUtils.generateWaitUserTaskId(req.getTaskDefinitionKey());
log.info("识别为ServiceTask前置等待节点, 原始任务定义Key: {}, 转换后任务定义Key: {}", req.getTaskDefinitionKey(), taskDefKey);
} else {
// 普通 UserTask
taskDefKey = req.getTaskDefinitionKey();
log.info("识别为普通UserTask, 任务定义Key: {}", taskDefKey);
}
Task task = taskService.createTaskQuery()
.processInstanceId(req.getProcessInstanceId())
.taskDefinitionKey(taskDefKey)
.singleResult();
log.info("任务查询完成, 流程实例ID: {}, 任务定义Key: {}, 查询结果为空: {}", req.getProcessInstanceId(), taskDefKey, task == null);
if (task == null) {
log.error("找不到任务流程实例ID: {}, taskDefinitionKey={}", req.getProcessInstanceId(), taskDefKey);
throw new RuntimeException("找不到任务! taskDefinitionKey=" + taskDefKey);
}
log.info("准备完成任务, 任务ID: {}, 变量数量: {}", task.getId(),
req.getVariables() != null ? req.getVariables().size() : 0);
// 完成任务
if (req.getVariables() != null) {
taskService.complete(task.getId(), req.getVariables());
log.info("任务完成(带变量), 任务ID: {}", task.getId());
} else {
taskService.complete(task.getId());
log.info("任务完成(无变量), 任务ID: {}", task.getId());
}
log.info("服务任务处理完成, 任务ID: {}", task.getId());
}
public void asyncCallback(AsyncCallbackRequest request) {
log.info("接收到异步回调请求, 请求内容: {}", request);
// 发送信号唤醒流程实例中等待的节点
universalDelegate.signalByTaskId(request);
log.info("异步回调处理转发完成, 任务ID: {}", request.getAsyncTaskId());
}
/**
@@ -545,25 +576,37 @@ public class ProcessService {
* @param newVariables 新的变量参数
*/
public void retryToNode(String procInstId, String targetNodeId, Map<String, Object> newVariables) {
log.info("开始重试节点操作, 流程实例ID: {}, 目标节点ID: {}, 新变量数量: {}",
procInstId, targetNodeId, newVariables != null ? newVariables.size() : 0);
// 1. 更新流程变量(用户新输入的参数)
if (newVariables != null && !newVariables.isEmpty()) {
log.info("更新流程变量, 变量数量: {}", newVariables.size());
runtimeService.setVariables(procInstId, newVariables);
}
// 2. 将目标节点ID存为变量供 ExecutionListener 使用)
log.info("设置重试目标节点变量, 流程实例ID: {}, 目标节点ID: {}", procInstId, targetNodeId);
runtimeService.setVariable(procInstId, FlowableConfig.RETRY_TARGET_NODE_ID, targetNodeId);
// 3. 查询并完成 retryInputTask
log.info("查询重试输入任务, 流程实例ID: {}, 重试任务ID: {}", procInstId, FlowNodeIdUtils.getRetryTaskId());
Task retryTask = taskService.createTaskQuery()
.processInstanceId(procInstId)
.taskDefinitionKey(FlowNodeIdUtils.getRetryTaskId())
.singleResult();
log.info("重试任务查询结果, 任务对象为空: {}", retryTask == null);
if (retryTask == null) {
log.error("未找到重试任务,请确认流程处于重试状态, 流程实例ID: {}", procInstId);
throw new IllegalStateException("未找到重试任务,请确认流程处于重试状态");
}
log.info("准备完成重试中转任务, 触发ExecutionListener, 任务ID: {}", retryTask.getId());
// 完成重试中转任务,触发 ExecutionListener
taskService.complete(retryTask.getId());
log.info("重试中转任务完成, 任务ID: {}", retryTask.getId());
}
}

View File

@@ -19,7 +19,8 @@
"extensionElements": {
"executeConfig": {
"executeType": "HPC",
"asyncCallback": true
"asyncCallback": true,
"waitUser":true
}
}
},