diff --git a/1-sql/2026-01-06/file_storage_quota.sql b/1-sql/2026-01-06/file_storage_quota.sql new file mode 100644 index 00000000..9fba0477 --- /dev/null +++ b/1-sql/2026-01-06/file_storage_quota.sql @@ -0,0 +1,6 @@ +ALTER TABLE file_storage_quota +DROP INDEX idx_userId, +DROP INDEX idx_tenantId; + +ALTER TABLE file_storage_quota +ADD UNIQUE KEY uk_tenant_user (tenantId, userId); \ No newline at end of file diff --git a/common/src/main/java/com/sdm/common/config/FlowableConfig.java b/common/src/main/java/com/sdm/common/config/FlowableConfig.java index d36f0d74..1fb04558 100644 --- a/common/src/main/java/com/sdm/common/config/FlowableConfig.java +++ b/common/src/main/java/com/sdm/common/config/FlowableConfig.java @@ -6,10 +6,6 @@ public interface FlowableConfig { */ String EXECUTECONFIG = "executeConfig"; - // 异步流程ReceiveTask节点回调返回结果变量名 - String RECEIVETASK_CALLBACKE_STATUS = "ReceivetaskCallbackeStatus"; - String RECEIVETASK_CALLBACKE_MSG = "ReceivetaskCallbackeMsg"; - /* * 重试相关变量名 */ @@ -33,7 +29,8 @@ public interface FlowableConfig { */ String ASYNC_TASK_SUFFIX = "_wait"; // 后置接收 String WAIT_USER_SUFFIX = "_waitUser"; //前置人工 - String CHECK_SUFFIX = "_check"; // 后置哨兵 + String CHECK_SUFFIX = "_check"; // 检查回调结果节点 + String REGISTER_SUFFIX = "_register"; // userTask的前置注册节点 /** * 流程的节点本地文件夹基础路径 @@ -48,10 +45,4 @@ public interface FlowableConfig { String EXECUTE_MODE_KEY = "executeMode"; String EXECUTE_MODE_AUTO = "AUTO"; String EXECUTE_MODE_MANUAL = "MANUAL"; - - /** - * 手动模式下,用户明确指定的文件列表 Key - * value: Map> - */ - String EXPLICIT_INPUT_FILES_KEY = "explicitInputFiles"; // 原: _explicitInputFiles } \ No newline at end of file diff --git a/common/src/main/java/com/sdm/common/config/FlowableVariables.java b/common/src/main/java/com/sdm/common/config/FlowableVariables.java new file mode 100644 index 00000000..8cfdc40a --- /dev/null +++ b/common/src/main/java/com/sdm/common/config/FlowableVariables.java @@ -0,0 +1,18 @@ +package com.sdm.common.config; + +/** + * 流程变量常量定义 + */ +public interface FlowableVariables { + // 异步流程ReceiveTask节点回调返回结果变量名 + String RECEIVETASK_CALLBACKE_STATUS = "ReceivetaskCallbackeStatus"; + String RECEIVETASK_CALLBACKE_MSG = "ReceivetaskCallbackeMsg"; + + + /** + * 手动模式下,用户明确指定的文件列表 Key + * value: Map> + */ + String EXPLICIT_INPUT_FILES_KEY = "explicitInputFiles"; // 原: _explicitInputFiles + +} diff --git a/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java b/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java index a9e2baf4..b0d25359 100644 --- a/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java +++ b/common/src/main/java/com/sdm/common/entity/flowable/dto/NodeDetailInfo.java @@ -16,4 +16,7 @@ public class NodeDetailInfo extends NodeStructureInfo{ private Long durationInMillis; private String durationFormatted; private String errorMessage; + + // 异步任务id + private String asyncTaskId; } \ No newline at end of file diff --git a/data/src/main/java/com/sdm/data/service/impl/DataStorageAnalysisImpl.java b/data/src/main/java/com/sdm/data/service/impl/DataStorageAnalysisImpl.java index 07944b95..1a1ce371 100644 --- a/data/src/main/java/com/sdm/data/service/impl/DataStorageAnalysisImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/DataStorageAnalysisImpl.java @@ -449,7 +449,7 @@ public class DataStorageAnalysisImpl implements DataStorageAnalysis { } // 判断用户是否存在存储配额,如果不存在则新增 - FileStorageQuota existFileStorageQuota = fileStorageQuotaService.lambdaQuery().eq(FileStorageQuota::getUserId, addUserQuotaEntity.getUserId()).one(); + FileStorageQuota existFileStorageQuota = fileStorageQuotaService.lambdaQuery().eq(FileStorageQuota::getUserId, addUserQuotaEntity.getUserId()).eq(FileStorageQuota::getTenantId, tenantId).one(); if (ObjectUtils.isEmpty(existFileStorageQuota)) { FileStorageQuota fileStorageQuota = new FileStorageQuota(); fileStorageQuota.setUserId(addUserQuotaEntity.getUserId()); diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index facaccb7..df78bc66 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -652,6 +652,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { queryBigFileReq.setCurrent(minioFileSearchReq.getCurrent()); queryBigFileReq.setSize(minioFileSearchReq.getSize()); queryBigFileReq.setDirIds(dirIds); + if(ObjectUtils.isNotEmpty(queryBigFileReq.getFileSuffix())){ + queryBigFileReq.setFileSuffix(queryBigFileReq.getFileSuffix().toLowerCase()); + } + if (Objects.equals(DirTypeEnum.KNOWLEDGE_BASE_DIR.getValue(), dirType)) { // 知识库文件:排除新增在审批的文件 queryBigFileReq.setApproveTypeList(fileDatdList); diff --git a/flowable/README.md b/flowable/README.md new file mode 100644 index 00000000..f23ade43 --- /dev/null +++ b/flowable/README.md @@ -0,0 +1,330 @@ +这份 README 文档在原有的基础上进行了深度迭代,**完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑**,并详细阐述了最新的 ID 管理与状态聚合策略。 + +--- + +# SDM Flowable Workflow Module + +## 1. 模块简介 + +本模块 (`com.sdm.flowable`) 是系统的工业仿真流程编排引擎核心。基于 **Flowable 7.0.1** 构建,摒弃了传统的静态 BPMN 文件模式,采用 **动态模型生成技术**。 + +它不仅支持**长耗时 HPC 异步任务**的“提交-等待-校验”闭环,还创新性地支持了**本地应用 UserTask** 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。 + +--- + +## 2. 核心架构与模型转换 (`Dto2BpmnConverter`) + +核心类 `Dto2BpmnConverter` 负责将前端定义的业务流程(DTO)转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略: + +### 2.1 策略 A:HPC 任务后置分裂 (Sentinel Pattern) +针对 `ServiceTask`,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 **3 个连续节点**: + +1. **Original Node (ServiceTask)**: 执行提交逻辑(如提交 HPC)。 +2. **Wait Node (ReceiveTask)**: 命名为 `_wait`,挂起流程等待回调。 +3. **Check Node (ServiceTask)**: 命名为 `_check`,哨兵校验。 + +> **链路**: `Original` $\rightarrow$ `_wait` $\rightarrow$ `_check` + +### 2.2 策略 B:本地应用 UserTask 前置编排 (Pre-Orchestration) +针对 `UserTask`,若配置为 `localApp` 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点**前面**插入 3 个辅助节点: + +1. **Register Node (ServiceTask)**: 命名为 `_register`。绑定 `LocalAppRegisterDelegate`,负责生成全局唯一的 `asyncTaskId` 并注册到业务表,同时存入流程 Local 变量。 +2. **Wait Node (ReceiveTask)**: 命名为 `_wait`。挂起流程,等待本地应用运行结束的回调。 +3. **Check Node (ServiceTask)**: 命名为 `_check`。哨兵校验本地应用的回调结果(成功/失败)。 +4. **Original Node (UserTask)**: 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。 + +> **链路**: `_register` $\rightarrow$ `_wait` $\rightarrow$ `_check` $\rightarrow$ `Original` + +### 2.3 人工介入控制 (Manual/Auto Switch) +针对普通 HPC 节点,`handleWaitUserTask` 方法默认在 `ServiceTask` 前插入 `_waitUser` (UserTask)。 +* **Manual 模式**: 流程停在 `_waitUser`,等待用户确认。 +* **Auto 模式**: 利用 SkipExpression 自动跳过 `_waitUser`。 + +--- + +## 3. 异步任务、变量管理与状态聚合 + +### 3.1 变量管理与 ID 获取 +在本地应用场景中,`asyncTaskId` 的流转至关重要: +1. **生成与存储**: `_register` 节点生成 ID,通过 `asyncTaskRecordService.registerAsyncTask` 落库,同时 `execution.setVariableLocal` 存入流程。 +2. **前端获取**: 前端轮询状态时,后端 **直接查询业务表 (`async_task_record`)** 获取最新的 `asyncTaskId`。 + * *查询条件*: `processInstanceId` + `receiveTaskId` + `status=RUNNING` + `OrderByTimeDesc`。 + * *优势*: 确保在“回退重试”场景下,永远获取到最新的业务 ID,避免流程变量残留旧值的问题。 + +### 3.2 任务完成与强一致性 (`completeAsyncTask`) +复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 **三层校验**: +1. **实例校验**: 流程是否存在。 +2. **挂起校验**: 若流程 **SUSPENDED**,仅更新数据库为 `SUCCESS`,**不触发流转**(回调缓冲)。激活时通过 `triggerPendingTasks` 自动补偿。 +3. **节点校验**: 确保流程停在 `_wait`。 +4. **触发**: 使用 `runtimeService.trigger` 唤醒流程。 + +### 3.3 状态聚合逻辑 (`determineUnifiedState`) +针对分裂后的多物理节点,后端计算出唯一的**业务聚合状态**返给前端: + +| 场景 | 物理节点位置 | 聚合状态 | 前端展示/交互 | +| :--- | :--- | :--- | :--- | +| **HPC 运行中** | `Original` / `_wait` / `_check` | **ACTIVE** | 显示“正在计算” | +| **本地应用运行中** | `_register` / `_wait` / `_check` | **ACTIVE** | 显示“拉起应用/运行中” | +| **任务失败** | `_check` (死信) | **ERROR** | 显示红色错误,允许重试 | +| **本地应用成功** | `Original` (UserTask) | **WAITING_FOR_USER** | 显示绿色,按钮变为“下一步” | +| **流程挂起** | 任意节点 | **SUSPENDED** | 锁死操作 | + +--- + +## 4. 核心时序交互图 + +### 4.1 场景一:HPC 异步任务(含挂起与补偿) +展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。 + +```mermaid +sequenceDiagram + autonumber + actor User as 用户/前端 + participant Ctrl as ProcessController + participant Service as ProcessService + participant Engine as Flowable Engine + participant Delegate as UniversalDelegate + participant DB as DB (AsyncRecord) + participant Sentinel as AsyncResultCheckDelegate + participant External as 外部系统(HPC) + + %% ========================================== + %% 阶段一:异步提交 + %% ========================================== + rect rgb(240, 255, 240) + note right of User: == 阶段一:HPC 异步提交 == + note right of Engine: 流转至 Original Node (ServiceTask) + Engine->>Delegate: execute() + Delegate->>External: 提交HPC任务 + Delegate->>DB: 插入记录 (Status=RUNNING) + note right of Engine: 流转至 _wait Node (ReceiveTask) + Engine->>Engine: 流程挂起 (Wait State) + end + + %% ========================================== + %% 阶段二:挂起与回调缓冲 + %% ========================================== + rect rgb(255, 240, 240) + note right of User: == 阶段二:挂起与回调缓冲 == + User->>Ctrl: suspendProcessInstance() + Ctrl->>Engine: runtimeService.suspend() + note right of Engine: 流程变为 SUSPENDED + + External->>Ctrl: asyncCallback (任务完成) + Ctrl->>Service: asyncCallback() + Service->>DB: 查询状态 + + alt 流程已挂起 + DB->>DB: 仅更新 Status=SUCCESS
(不触发 trigger) + note right of DB: 回调被缓冲,流程停在 _wait + end + end + + %% ========================================== + %% 阶段三:激活与补偿 + %% ========================================== + rect rgb(240, 240, 255) + note right of User: == 阶段三:激活与补偿 == + User->>Ctrl: activateProcessInstance() + Ctrl->>Engine: runtimeService.activate() + + Ctrl->>Service: triggerPendingTasks()
(激活后的自动动作) + Service->>DB: 查询缓冲的成功记录 + Service->>Engine: runtimeService.trigger()
(补偿触发) + end + + %% ========================================== + %% 阶段四:哨兵校验 + %% ========================================== + rect rgb(255, 255, 240) + note right of User: == 阶段四:哨兵校验 == + note right of Engine: 流转至 _check Node + Engine->>Sentinel: execute() + + alt STATUS == FAIL + Sentinel-->>Engine: 抛出异常 -> DeadLetterJob + note right of Engine: 节点变红 (ERROR) + end + end +``` + +### 4.2 场景二:本地应用 UserTask(交互闭环) +展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。 + +```mermaid +sequenceDiagram + autonumber + actor User as 用户 + participant FE as 前端/插件 + participant Ctrl as ProcessController + participant Engine as Flowable Engine + participant Reg as LocalAppRegisterDelegate + participant DB as DB (AsyncRecord) + participant Sentinel as AsyncResultCheckDelegate + + %% ========================================== + %% 步骤 1: 自动注册与挂起 + %% ========================================== + rect rgb(230, 245, 255) + note right of User: == 1. 自动注册与挂起 == + Engine->>Reg: 进入 _register 节点 + Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING) + Reg->>Engine: setVariableLocal(asyncTaskId) + Engine->>Engine: 流转至 _wait, 流程挂起 + end + + %% ========================================== + %% 步骤 2: 前端获取 ID 并拉起 + %% ========================================== + rect rgb(255, 250, 230) + note right of User: == 2. 拉起应用 == + FE->>Ctrl: getProcessDetail() + Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录 + Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE + + User->>FE: 点击"拉起应用" + FE->>User: 唤起本地 EXE (传入 asyncTaskId) + end + + %% ========================================== + %% 步骤 3: 异步回调与校验 + %% ========================================== + rect rgb(240, 255, 240) + note right of User: == 3. 回调与校验 == + User->>FE: 应用运行结束 + FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0) + Ctrl->>DB: 更新 SUCCESS + Ctrl->>Engine: runtimeService.trigger() + + Engine->>Sentinel: 进入 _check 校验 + Sentinel->>Engine: 校验通过 + end + + %% ========================================== + %% 步骤 4: 人工确认 + %% ========================================== + rect rgb(240, 240, 240) + note right of User: == 4. 人工确认 == + Engine->>Engine: 流转至 Original UserTask + + FE->>Ctrl: getProcessDetail() + Ctrl-->>FE: 状态 WAITING_FOR_USER + + User->>FE: 点击"下一步" + FE->>Ctrl: /continueServiceTask + Ctrl->>Engine: taskService.complete() + end +``` + +--- + +## 5. 附加功能交互流程 + +### 5.1 原地重试 (Retry) +当哨兵校验失败(`_check` 变红)时,用户修复问题后可点击重试。主要用于 **HPC 任务** 或 **数据校验** 场景。 +* **操作**: `/retryFailedNode` +* **原理**: 将 `DeadLetterJob` 移回 `ExecutableJob`,重新触发校验逻辑。 + +### 5.2 回退跳转 (Rewind/Jump) +适用于 **本地应用失败** 或 **参数填写错误** 场景。 +* **操作**: `/retryToNode(targetNodeId)` +* **原理**: + 1. 用户点击“重新执行”。 + 2. 后端将流程指针强行跳转回 `_register`(针对本地应用)或任意前置节点。 + 3. `_register` 重新执行,**生成全新的 `asyncTaskId`**。 + 4. 流程重新挂起,前端获取新 ID,允许用户再次拉起应用。 + +--- + +## 6. ProcessController 接口能力清单 + +| 方法 | 描述 | 逻辑细节 | +| :--- | :--- | :--- | +| `deploy` | 流程部署 | DTO -> BPMN 转换并部署 | +| `saveParamsByDefinitionId` | 保存参数 | 保存用户输入的节点参数,作为运行模板 | +| `startByProcessDefinitionId` | 启动流程 | 根据定义ID启动实例 | +| `suspendProcessInstance` | 挂起实例 | 校验实例状态,挂起后阻止任务执行和回调触发 | +| `activateProcessInstance` | 激活实例 | 激活流程,并立即调用 `triggerPendingTasks` **补偿触发**积压的回调 | +| `cancelProcessInstance` | 取消实例 | 终止运行中的流程 | +| `getProcessAndNodeDetailByInstanceId` | 状态查询 | 获取聚合状态;**若为本地应用且 Active,会查询业务表返回 asyncTaskId** | +| `previewNodeInputFiles` | 文件预览 | 扫描 MinIO 或 本地磁盘,返回文件列表 | +| `continueServiceTask` | 继续执行 | 完成 `_waitUser` 或 `Original UserTask`,推动流程 | +| `asyncCallback` | 异步回调 | 通用回调接口,支持 HPC 和 本地应用 | +| `retryFailedNode` | 原地重试 | 恢复死信作业 | +| `retryToNode` | 回退重试 | 携带新参数,将流程跳转至任意指定节点(支持生成新业务ID) | + +好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 **第 7 章节** 补充在最后。 + +--- + +## 7. 核心机制与开发 Q&A + +### Q1: 链路 `_waitUser` -> `Original` -> `_wait` -> `_check` 中,任意节点都可以挂起吗? + +**A: 是的,可以在任意节点挂起,但底层表现不同。** + +所有节点在 Flowable 数据库中都有对应的持久化状态(Task、Job 或 Execution),挂起操作(Suspension)是针对流程实例级的,会冻结所有当前活跃的节点。 + +| 节点 | 类型 | 驱动方 | 挂起后的表现 | +| :--- | :--- | :--- | :--- | +| **`_waitUser`** | UserTask | **人工** | 用户点击“完成”时报错(拒绝操作)。 | +| **`Original`** | ServiceTask (Async) | **引擎** | Job 停留在数据库,AsyncExecutor 会跳过执行,直到流程激活。 | +| **`_wait`** | ReceiveTask | **外部** | 外部回调会被业务代码拦截并缓冲到 DB,**不触发流程流转**。 | +| **`_check`** | ServiceTask (Async) | **引擎** | 同 `Original`,校验逻辑暂停执行。 | + +--- + +### Q2: 为什么只有 `_wait` (ReceiveTask) 需要代码补偿 (`triggerPendingTasks`)? + +**A: 因为只有它是“事件驱动”且请求不可重放的。** + +* **`_wait` (ReceiveTask)**: 依赖外部系统的**一次性回调**。如果挂起期间回调到达,Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎**不知道**发生过回调,必须由代码主动查询数据库并补发 `trigger`。 +* **`Original` / `_check`**: 依赖引擎的**轮询扫描**。流程激活后,AsyncExecutor 下次扫描时会自动发现并执行这些 Job,无需人工干预。 +* **`_waitUser`**: 依赖**人工重试**。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。 + +--- + +### Q3: `runtimeService.trigger` 是专门用来触发 ReceiveTask 的吗? + +**A: 在本系统的上下文中,是的。** + +请严格区分以下 API 的用途: + +1. **`runtimeService.trigger(executionId)`**: 专属用于 **`ReceiveTask`** (`_wait` 节点)。告诉引擎结束等待,继续流转。 +2. **`taskService.complete(taskId)`**: 专属用于 **`UserTask`** (`_waitUser` 节点)。完成人工任务。 +3. **`managementService.moveDeadLetterJob...`**: 专属用于 **死信 Job** (`_check` 失败后)。恢复异常任务。 + +> **注意**: 绝不能混用 `trigger` 和 `complete`。 + +--- + +### Q4: Flowable 变量有哪几种?如何正确设置和获取? + +**A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。** + +#### 1. 变量类型对比 + +| 类型 | 作用域 (Scope) | 特点 | 适用场景 | +| :--- | :--- | :--- | :--- | +| **Global** | Process Instance | 全流程可见,同名覆盖 | 业务主键 (runId)、发起人信息 | +| **Local** | Execution / Task | 仅当前节点可见,**数据隔离** | **异步任务 ID**、并行分支数据、临时状态 | + +#### 2. 代码操作指南 + +**设置变量 (Write):** +```java +// [推荐] 使用 Local 防止并发分支数据污染 +// 在 JavaDelegate 中 +execution.setVariableLocal("asyncTaskId", id); +// 在 Service 中 +runtimeService.setVariableLocal(executionId, "status", "SUCCESS"); +``` + +**获取变量 (Read):** +```java +// [推荐] 使用 getVariable (自动冒泡查找) +// 优先找 Local,找不到找 Global。这样兼容性最好。 +String val = (String) execution.getVariable("asyncTaskId"); +``` + +> **最佳实践**: 在 `_register` 节点存入 `asyncTaskId` 时使用 `setVariableLocal`。前端查询展示时,**建议直接查询业务表 (`async_task_record`)** 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。 \ No newline at end of file diff --git a/flowable/repomix-output.xml b/flowable/repomix-output.xml index ecb69cfa..0886f0b9 100644 --- a/flowable/repomix-output.xml +++ b/flowable/repomix-output.xml @@ -1075,7 +1075,7 @@ import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG; import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS; /** - * 异步结果校验 Delegate + * 哨兵节点 ServiceTask (_check) 绑定的异步结果校验逻辑 */ @Component("asyncResultCheckDelegate") public class AsyncResultCheckDelegate implements JavaDelegate { @@ -1455,16 +1455,21 @@ public class ExportWordScriptHandler implements ExecutionHandler package com.sdm.flowable.delegate.handler; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.config.FlowableConfig; +import com.sdm.common.entity.enums.MessageTemplateEnum; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; +import com.sdm.common.entity.req.system.SendMsgReq; import com.sdm.common.entity.resp.data.FileMetadataInfoResp; +import com.sdm.common.feign.impl.system.MessageFeignClientImpl; import com.sdm.common.feign.inter.data.IDataFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.log.CoreLogger; -import com.sdm.common.config.FlowableConfig; import com.sdm.flowable.entity.ProcessNodeParam; import com.sdm.flowable.enums.AsyncTaskStatusEnum; import com.sdm.flowable.service.IAsyncTaskRecordService; @@ -1497,6 +1502,9 @@ public class HpcHandler implements ExecutionHandler,HPCExecu @Autowired private IDataFeignClient dataFeignClient; + @Autowired + private MessageFeignClientImpl messageFeignClient; + /* * params:业务参数 * config:框架属性 @@ -1510,7 +1518,8 @@ public class HpcHandler implements ExecutionHandler,HPCExecu String masterFileRegularStr = config.getMasterFileRegularStr(); String inputFilesRegularStr = config.getInputFilesRegularStr(); CoreLogger.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr); - + // 初始化用户/租户信息 + initUserInfo(execution); // params 取只是测试使用 String processDefinitionId = (execution==null||StringUtils.isBlank(execution.getProcessDefinitionId()))? params.get("processDefinitionId").toString():execution.getProcessDefinitionId(); @@ -1535,10 +1544,15 @@ public class HpcHandler implements ExecutionHandler,HPCExecu // 1. 调用 HPC 平台提交任务 SdmResponse submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq); if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ + // 推送失败消息 + sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"失败"); log.error("HpcHandler submit failed,jobName:{}",params); throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage()); } + String hpcTaskId = submitResp.getData(); + // 推送成功消息 + sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"成功"); CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId); // 2. 存数据库(提交状态 + 外部任务ID) asyncTaskRecordService.registerAsyncTask( @@ -1553,6 +1567,27 @@ public class HpcHandler implements ExecutionHandler,HPCExecu log.info("HPC 任务 {} 已提交", "hpcTaskId"); } + private void sendMsg(Long tenanId,Long userId,String jobName,String result){ + SendMsgReq req = new SendMsgReq(); + req.setTitle(MessageTemplateEnum.HPC_START.getTitle()); + req.setContent(MessageTemplateEnum.HPC_START.getContent(jobName,result)); + req.setTenantId(String.valueOf(tenanId)); + req.setUserId(String.valueOf(userId)); + log.info("hpc start push msg:{}", JSON.toJSONString(req)); + messageFeignClient.sendMessage(req); + } + + private void initUserInfo(DelegateExecution execution) { + // 获取当前流程实例参数 + Long userId = (Long) execution.getVariable("userId"); + String userName = (String) execution.getVariable("userName"); + Long tenantId = (Long) execution.getVariable("tenantId"); + ThreadLocalContext.setUserId(userId); + ThreadLocalContext.setUserName(userName); + ThreadLocalContext.setTenantId(tenantId); + CoreLogger.info("hpcHander initUserInfo userId:{},tenantId:{},userName:{}",userId,tenantId,userName); + } + private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId, String processDefinitionId, String processInstanceId, String executeMode,Map params) { String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR; @@ -2309,7 +2344,7 @@ public enum FlowElementTypeEnums { public static FlowElementTypeEnums fromString(String type) { for (FlowElementTypeEnums flowElementType : FlowElementTypeEnums.values()) { - if (flowElementType.type.equals(type)) { + if (flowElementType.type.equalsIgnoreCase(type)) { return flowElementType; } } @@ -2339,6 +2374,13 @@ public enum NodeStateEnum { * 挂起 */ SUSPENDED("suspended"), + + /** + * [新增] 等待用户确认(对应 Manual 模式卡在 _waitUser 节点) + * 前端看到这个状态应显示"执行/继续"按钮 + */ + WAITING_FOR_USER("waiting_for_user"), + /** * 错误 @@ -3098,7 +3140,7 @@ public class ProcessService implements Iprocess{ // 4. 计算流程实例整体状态 // 只要 hasDeadLetterJobs 为 false,流程状态就会显示为 running (或 suspended) - processInfo = buildProcessInstanceInfo(processInstanceId, context.isRunning(), context.isSuspended(), context.isHasDeadLetterJobs()); + processInfo = buildProcessInstanceInfo(processInstanceId, context); // 5. 计算每个节点的状态 (核心逻辑) calculateNodeStates(allNodes, context); @@ -3115,6 +3157,10 @@ public class ProcessService implements Iprocess{ boolean isRunning; boolean isSuspended; boolean hasDeadLetterJobs; + + // 记录最早发生错误的时间(死信作业的创建时间) + Date earliestErrorTime; + // Map Map errorMap = new HashMap<>(); // List @@ -3143,6 +3189,7 @@ public class ProcessService implements Iprocess{ ProcessStateContext ctx = new ProcessStateContext(); // 1. 获取运行时流程实例对象(判断是否运行中、是否挂起) + // ACT_RU_EXECUTION (运行时执行实例表) ProcessInstance runtimeInstance = runtimeService.createProcessInstanceQuery() .processInstanceId(processInstanceId) .singleResult(); @@ -3150,18 +3197,22 @@ public class ProcessService implements Iprocess{ ctx.setSuspended(ctx.isRunning() && runtimeInstance.isSuspended()); // 2. 准备历史数据 + // 查询 ACT_HI_ACTINST 该流程实例下,所有的历史节点记录 List historicActivities = historyService.createHistoricActivityInstanceQuery() .processInstanceId(processInstanceId) .list(); for (HistoricActivityInstance hist : historicActivities) { if (!ctx.historyMap.containsKey(hist.getActivityId()) || hist.getStartTime().after(ctx.historyMap.get(hist.getActivityId()).getStartTime())) { + // 只保留节点最新的开始时间记录,防止重复覆盖(例如:重试作业) ctx.historyMap.put(hist.getActivityId(), hist); } } // 3. 准备运行时 Active ID 列表 if (ctx.isRunning()) { + // 查 ACT_RU_EXECUTION 运行时活跃节点ID列表 + // historicActivities 也一定会有活跃节点ID,当流程的流转入 serviceTask_id1 的那一瞬间,引擎会立即在 ACT_HI_ACTINST 表插入一条记录 ctx.activeActivityIds.addAll(runtimeService.getActiveActivityIds(processInstanceId)); // A. 查死信 (Error 来源) @@ -3174,6 +3225,9 @@ public class ProcessService implements Iprocess{ // C. 只有当有 作业(死信或普通) 时,才去查 Execution 映射 if (!deadJobs.isEmpty() || !activeJobs.isEmpty()) { + // 查 Execution 表,获取 ExecutionId 和 ActivityId 映射 + // Flowable 的 Job 表(ACT_RU_JOB 或 ACT_RU_DEADLETTER_JOB)存储的是“干活的任务单”,里面只有 executionId(执行指针 ID),没有 activityId(BPMN 里的节点 ID,如 ServiceTask_1) + // 就算已经进入 ACT_RU_DEADLETTER_JOB 死信队列了 , ACT_RU_EXECUTION 还是会有对应的 ExecutionId 和 ActivityId List executions = runtimeService.createExecutionQuery() .processInstanceId(processInstanceId).list(); Map executionToActivityMap = executions.stream() @@ -3183,6 +3237,16 @@ public class ProcessService implements Iprocess{ // 处理死信 -> 放入 errorMap if (!deadJobs.isEmpty()) { ctx.setHasDeadLetterJobs(true); + + // 找到最早的死信时间,作为流程“卡住”的时间点 + Date firstError = deadJobs.stream() + .map(Job::getCreateTime) + .filter(Objects::nonNull) + .min(Date::compareTo) + .orElse(null); + ctx.setEarliestErrorTime(firstError); + + for (Job job : deadJobs) { if (job.getExceptionMessage() != null) { String actId = executionToActivityMap.get(job.getExecutionId()); @@ -3205,7 +3269,11 @@ public class ProcessService implements Iprocess{ } // 构建流程实例信息 - private ProcessInstanceInfo buildProcessInstanceInfo(String processInstanceId, boolean isRunning, boolean isSuspended, boolean hasError) { + private ProcessInstanceInfo buildProcessInstanceInfo(String processInstanceId, ProcessStateContext context) { + boolean isRunning = context.isRunning(); + boolean isSuspended = context.isSuspended(); + boolean hasError = context.isHasDeadLetterJobs(); + HistoricProcessInstance historicInstance = historyService.createHistoricProcessInstanceQuery() .processInstanceId(processInstanceId) .singleResult(); @@ -3222,9 +3290,22 @@ public class ProcessService implements Iprocess{ // 计算耗时 Long duration = historicInstance.getDurationInMillis(); + // 如果 duration 为空(说明流程没结束),且流程已开始 if (duration == null && historicInstance.getStartTime() != null && isRunning) { - duration = System.currentTimeMillis() - historicInstance.getStartTime().getTime(); + + if (hasError && context.getEarliestErrorTime() != null) { + // 情况 A: 流程报错了,耗时 = 报错时刻 - 开始时刻 + // 这样时间就“定格”了,不会随着你查询时间而增加 + duration = context.getEarliestErrorTime().getTime() - historicInstance.getStartTime().getTime(); + } else { + // 情况 B: 正常运行或挂起,耗时 = 当前时刻 - 开始时刻 + duration = System.currentTimeMillis() - historicInstance.getStartTime().getTime(); + } } + + // 防止出现负数(极端并发情况下) + if (duration != null && duration < 0) duration = 0L; + info.setDurationInMillis(duration); info.setDurationFormatted(duration != null ? formatDuration(duration) : null); @@ -3312,7 +3393,19 @@ public class ProcessService implements Iprocess{ if (isActive) { // 只要是 Active,就根据流程整体状态来定颜色 - node.setStatus(ctx.isSuspended() ? NodeStateEnum.SUSPENDED.getCode() : NodeStateEnum.ACTIVE.getCode()); + // 如果流程整体被挂起,优先显示挂起 + if (ctx.isSuspended()) { + node.setStatus(NodeStateEnum.SUSPENDED.getCode()); + } else if (ctx.activeActivityIds.contains(waitUserId)) { + // 停留在 _waitUser -> 等待用户操作 (Manual模式) + node.setStatus(NodeStateEnum.WAITING_FOR_USER.getCode()); + } else if (FlowElementTypeEnums.USERTASK.getType().equalsIgnoreCase(node.getType()) && ctx.activeActivityIds.contains(origId)) { + // 这是一个普通的 UserTask,且当前正卡在这里 + node.setStatus(NodeStateEnum.WAITING_FOR_USER.getCode()); + }else { + // 停留在 origId 或 waitId 或 checkId-> 视为通用执行中 + node.setStatus(NodeStateEnum.ACTIVE.getCode()); + } calculateAggregatedTime(node, ctx, waitUserId, origId, checkId); return; } @@ -4475,7 +4568,7 @@ public class Dto2BpmnConverter { checkTask.setName(nodeDto.getName() + "结果校验"); checkTask.setAsynchronous(true); // 必须异步,产生 Job 用于报错 // 设置快速失败(1次重试,失败即死信),保证强一致性 - disableAsyncRetry(receiveTask); + disableAsyncRetry(checkTask); // 绑定校验 Delegate checkTask.setImplementation("${asyncResultCheckDelegate}"); checkTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/AsyncResultCheckDelegate.java b/flowable/src/main/java/com/sdm/flowable/delegate/AsyncResultCheckDelegate.java index 24e9f49a..05b1133c 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/AsyncResultCheckDelegate.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/AsyncResultCheckDelegate.java @@ -1,15 +1,19 @@ package com.sdm.flowable.delegate; +import com.sdm.flowable.enums.AsyncTaskStatusEnum; +import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.springframework.stereotype.Component; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_MSG; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_STATUS; /** - * 异步结果校验 Delegate + * 哨兵节点 ServiceTask (_check) 绑定的异步结果校验逻辑 + * 用于校验外部系统回调回来的结果,如果失败则抛出异常让流程卡死 */ +@Slf4j @Component("asyncResultCheckDelegate") public class AsyncResultCheckDelegate implements JavaDelegate { @@ -19,6 +23,7 @@ public class AsyncResultCheckDelegate implements JavaDelegate { // Flowable 的 getVariable 会自动查找 Local -> Parent,所以 setVariableLocal 后这里能取到 String status = (String) execution.getVariable(RECEIVETASK_CALLBACKE_STATUS); String msg = (String) execution.getVariable(RECEIVETASK_CALLBACKE_MSG); + log.info("异步任务回调结果校验:状态={},信息={}", status, msg); // 防御性检查:如果变量丢失(极低概率,除非手动操作了数据库),默认认为是 SUCCESS 防止卡死,或者抛错 if (status == null) { @@ -28,11 +33,11 @@ public class AsyncResultCheckDelegate implements JavaDelegate { } // 2. 核心校验逻辑 - if ("FAIL".equals(status)) { + if (AsyncTaskStatusEnum.FAIL.getCode().equalsIgnoreCase(status)) { // 【自爆】抛出 RuntimeException // Flowable 会捕获此异常 -> 减少重试次数 -> 最终移动到 DeadLetterJob 表 -> 节点变红 String errorInfo = (msg != null && !msg.isEmpty()) ? msg : "外部系统返回失败状态,未提供详细信息"; - throw new RuntimeException("HPC任务执行失败: " + errorInfo); + throw new RuntimeException("异步任务执行失败: " + errorInfo); } // 3. SUCCESS 的情况 diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/LocalAppRegisterDelegate.java b/flowable/src/main/java/com/sdm/flowable/delegate/LocalAppRegisterDelegate.java new file mode 100644 index 00000000..a91beb1e --- /dev/null +++ b/flowable/src/main/java/com/sdm/flowable/delegate/LocalAppRegisterDelegate.java @@ -0,0 +1,57 @@ +package com.sdm.flowable.delegate; + +import com.sdm.flowable.enums.AsyncTaskStatusEnum; +import com.sdm.flowable.service.IAsyncTaskRecordService; +import com.sdm.flowable.util.FlowNodeIdUtils; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.UUID; + + +/** + * 本地应用对应的注册节点(RegisterNode) 绑定的 生成异步任务ID 处理器 + * 用于拉起本地应用前,生成异步任务ID = asyncTaskId,并将异步任务ID放入流程变量中 + * 将异步任务ID=asyncTaskId 和 回调节点ID=waitNodeId 保存到 async_task_record 异步任务执行记录表 + * 本地应用执行完成,异步回到时,基于 asyncTaskId 查询异步任务,获取下一个回调节点 waitNodeId,能够恢复流程继续执行 + * + * @author: shimingdm + * @date: 2023/1/31 10:05 + */ +@Slf4j +@Component("localAppRegisterDelegate") +public class LocalAppRegisterDelegate implements JavaDelegate { + + @Autowired + private IAsyncTaskRecordService asyncTaskRecordService; + + @Override + public void execute(DelegateExecution execution) { + // 1. 获取原始 UserTask 的 ID (从 _register 节点ID倒推) + // 当前 activityId 是 "UserTask_A_register" -> 解析出 "UserTask_A" + String currentId = execution.getCurrentActivityId(); + String originalId = FlowNodeIdUtils.parseOriginalNodeId(currentId); + + // 2. 计算接下来的 Wait 节点 ID (用于回调定位) + String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalId); + + // 3. 生成全局唯一的异步任务ID + String asyncTaskId = UUID.randomUUID().toString(); + + log.info("本地应用绑定的执行策略处理:当前节点ID: {}, 原始节点ID: {}, 下一个等待节点ID: {}, 生成异步任务ID: {}", currentId, originalId, waitNodeId, asyncTaskId); + + // 4. 注册到业务表 (状态: RUNNING/INIT) + // handlerType 标记为 LOCAL_APP + asyncTaskRecordService.registerAsyncTask( + execution, + waitNodeId, + "LOCAL_APP", + execution.getVariables(), + AsyncTaskStatusEnum.RUNNING.getCode(), + asyncTaskId + ); + } +} \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index 7ef55a1d..5da29731 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.config.FlowableConfig; +import com.sdm.common.config.FlowableVariables; import com.sdm.common.entity.enums.MessageTemplateEnum; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; @@ -203,7 +204,7 @@ public class HpcHandler implements ExecutionHandler,HPCExecu private static List getFileListFromMap(Map dataMap, String key) { return Optional.ofNullable(dataMap) // 提取 explicitInputFiles 子 Map - .map(map -> (Map) map.get("explicitInputFiles")) + .map(map -> (Map) map.get(FlowableVariables.EXPLICIT_INPUT_FILES_KEY)) // 提取指定 key 的列表 .map(explicitMap -> (List) explicitMap.get(key)) // 为空则返回空列表,避免后续遍历 NPE diff --git a/flowable/src/main/java/com/sdm/flowable/enums/ExecuteTypeEnum.java b/flowable/src/main/java/com/sdm/flowable/enums/ExecuteTypeEnum.java new file mode 100644 index 00000000..29661a9d --- /dev/null +++ b/flowable/src/main/java/com/sdm/flowable/enums/ExecuteTypeEnum.java @@ -0,0 +1,18 @@ +package com.sdm.flowable.enums; + +/** + * 执行类型枚举 + */ +public enum ExecuteTypeEnum { + CLOUD_APP("cloudApp"), + LOCAL_APP("localApp"), + HPC("HPC"); + + private String code; + ExecuteTypeEnum(String code) { + this.code = code; + } + public String getCode() { + return code; + } +} diff --git a/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java b/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java index 7efd4d54..e4834e87 100644 --- a/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java +++ b/flowable/src/main/java/com/sdm/flowable/process/ProcessService.java @@ -1,10 +1,12 @@ package com.sdm.flowable.process; import com.alibaba.fastjson2.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.flowable.dto.NodeDetailInfo; import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO; import com.sdm.common.entity.flowable.dto.ProcessInstanceInfo; +import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.flowable.AsyncCallbackRequest; import com.sdm.common.entity.resp.data.FileMetadataInfoResp; @@ -16,6 +18,8 @@ import com.sdm.flowable.delegate.UniversalDelegate; import com.sdm.flowable.dto.req.CompleteTaskReq; import com.sdm.flowable.dto.req.PreviewNodeInputFilesReq; import com.sdm.flowable.dto.resp.NodeInputFilePreviewResp; +import com.sdm.flowable.entity.AsyncTaskRecord; +import com.sdm.flowable.enums.ExecuteTypeEnum; import com.sdm.flowable.enums.FlowElementTypeEnums; import com.sdm.flowable.enums.NodeStateEnum; import com.sdm.flowable.enums.ProcessInstanceStateEnum; @@ -84,6 +88,10 @@ public class ProcessService implements Iprocess{ @Autowired private IDataFeignClient dataFeignClient; + @Autowired + private ObjectMapper objectMapper; + + // 部署流程(前端传入Flowable标准JSON) public SdmResponse deploy(ProcessDefinitionDTO processDTO) throws Exception { log.info("开始部署流程定义: {}",processDTO); @@ -304,6 +312,8 @@ public class ProcessService implements Iprocess{ boolean isSuspended; boolean hasDeadLetterJobs; + String processInstanceId; + // 记录最早发生错误的时间(死信作业的创建时间) Date earliestErrorTime; @@ -333,6 +343,7 @@ public class ProcessService implements Iprocess{ */ private ProcessStateContext prepareStateContext(String processInstanceId) { ProcessStateContext ctx = new ProcessStateContext(); + ctx.setProcessInstanceId(processInstanceId); // 1. 获取运行时流程实例对象(判断是否运行中、是否挂起) // ACT_RU_EXECUTION (运行时执行实例表) @@ -498,7 +509,8 @@ public class ProcessService implements Iprocess{ if (isOriginal) { // === 原始节点:聚合逻辑 === - // 它需要根据 "waitUser -> self -> wait -> check" 整个链条来决定状态 + // serviceTask 它需要根据 "waitUser -> self -> wait -> check" 整个链条来决定状态 + // userTask 它需要根据 "Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask)" 链条来决定状态 determineUnifiedState(node, ctx); } else { // === 辅助节点:物理逻辑 === @@ -512,6 +524,81 @@ public class ProcessService implements Iprocess{ * 逻辑 A:原始节点状态计算 (聚合所有关联节点) */ private void determineUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { + if(isLocalApp(node)){ + // 针对本地应用节点的特殊聚合逻辑 + determineUserTaskUnifiedState(node, ctx); + }else { + // 针对serviceTask的任务节点的聚合状态展示处理 + determineServiceTaskUnifiedState(node, ctx); + } + + } + + /** + * 根据 "Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask)" 链条来决定状态 + * @param node + * @param ctx + */ + private void determineUserTaskUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { + String origId = node.getId(); + String registerId = FlowNodeIdUtils.generateRegisterTaskId(origId); + String waitId = FlowNodeIdUtils.generateAsyncTaskId(origId); + String checkId = FlowNodeIdUtils.generateCheckTaskId(origId); + String procInstId = ctx.getProcessInstanceId(); // 或者作为参数传入 + + AsyncTaskRecord record = asyncTaskRecordService.lambdaQuery() + .eq(AsyncTaskRecord::getProcessInstanceId, procInstId) + .eq(AsyncTaskRecord::getReceiveTaskId, waitId) + .last("LIMIT 1") + .one(); + + if (record != null) { + node.setAsyncTaskId(record.getAsyncTaskId()); + } + + // 1. 判断 ERROR (Check 节点死信) + if (ctx.errorMap.containsKey(checkId)) { + node.setStatus(NodeStateEnum.ERROR.getCode()); + node.setErrorMessage(ctx.errorMap.get(checkId)); + // 计算时间:从 register 开始,到当前时刻 + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 2. 判断 WAITING_FOR_USER (停在 Original UserTask) + // 这代表前置的自动化流程跑完了,且成功了,正在等用户点下一步 + if (ctx.activeActivityIds.contains(origId)) { + node.setStatus(NodeStateEnum.WAITING_FOR_USER.getCode()); + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 3. 判断 ACTIVE (停在 Register / Wait / Check) + // 代表自动化流程正在跑 (注册中、应用运行中、校验中) + boolean isActive = ctx.activeActivityIds.contains(registerId) || + ctx.activeActivityIds.contains(waitId) || + ctx.activeActivityIds.contains(checkId); + + if (isActive) { + // 如果流程挂起,优先显示挂起 + node.setStatus(ctx.isSuspended() ? NodeStateEnum.SUSPENDED.getCode() : NodeStateEnum.ACTIVE.getCode()); + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 4. 判断 FINISHED (Original 节点已结束) + HistoricActivityInstance lastHist = ctx.historyMap.get(origId); + if (lastHist != null && lastHist.getEndTime() != null) { + node.setStatus(NodeStateEnum.FINISHED.getCode()); + calculateLocalAppTime(node, ctx, registerId, origId); + return; + } + + // 5. PENDING + node.setStatus(NodeStateEnum.PENDING.getCode()); + } + + private void determineServiceTaskUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) { String origId = node.getId(); String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(origId); String waitId = FlowNodeIdUtils.generateAsyncTaskId(origId); @@ -574,6 +661,24 @@ public class ProcessService implements Iprocess{ node.setStatus(NodeStateEnum.PENDING.getCode()); } + + // 判断是否为本地应用节点 + private boolean isLocalApp(NodeDetailInfo node) { + String executeConfigJson = node.getExecuteConfig(); + if (executeConfigJson == null || executeConfigJson.isBlank()) { + return false; + } + + try { + BaseExecuteConfig config = + objectMapper.readValue(executeConfigJson, BaseExecuteConfig.class); + return ExecuteTypeEnum.LOCAL_APP.getCode().equalsIgnoreCase(config.getExecuteType()); + } catch (Exception e) { + log.warn("解析 executeConfig 失败,nodeId={}, json={}", + node.getId(), executeConfigJson, e); + return false; + } + } /** * 逻辑 B:辅助节点状态计算 (物理状态) */ @@ -633,6 +738,45 @@ public class ProcessService implements Iprocess{ calculateDuration(node, startTime, endTime); } + /** + * 计算本地应用聚合节点的时间 + * 逻辑: + * StartTime = _register 节点的开始时间 + * EndTime = 只有当状态是 FINISHED 时,取 Original 节点的结束时间,否则为 null + * Duration = EndTime (或 Now) - StartTime + * + * @param node 节点信息对象 + * @param ctx 状态上下文 + * @param registerId 链条起点的ID + * @param origId 链条终点的ID + */ + private void calculateLocalAppTime(NodeDetailInfo node, ProcessStateContext ctx, String registerId, String origId) { + // 1. 获取开始时间 (取 _register 的历史记录) + HistoricActivityInstance startInst = ctx.historyMap.get(registerId); + + // 防御性编程:如果 _register 没查到(极少见),尝试查 origId 补救 + if (startInst == null) { + startInst = ctx.historyMap.get(origId); + } + + Date startTime = (startInst != null) ? startInst.getStartTime() : null; + node.setStartTime(startTime); + + // 2. 获取结束时间 (仅当整个节点 FINISHED 时才有值) + Date endTime = null; + if (NodeStateEnum.FINISHED.getCode().equals(node.getStatus())) { + // 取终点 Original (UserTask) 的历史记录 + HistoricActivityInstance endInst = ctx.historyMap.get(origId); + if (endInst != null) { + endTime = endInst.getEndTime(); + } + } + node.setEndTime(endTime); + + // 3. 计算耗时 + calculateDuration(node, startTime, endTime); + } + /** * 工具:设置单个节点的物理时间 */ diff --git a/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java b/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java index 184e3f53..11a75f42 100644 --- a/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java +++ b/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java @@ -20,8 +20,8 @@ import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_MSG; -import static com.sdm.common.config.FlowableConfig.RECEIVETASK_CALLBACKE_STATUS; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_MSG; +import static com.sdm.common.config.FlowableVariables.RECEIVETASK_CALLBACKE_STATUS; /** *

diff --git a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java index 2b0bcb97..4e5860f9 100644 --- a/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java +++ b/flowable/src/main/java/com/sdm/flowable/util/Dto2BpmnConverter.java @@ -16,6 +16,8 @@ import org.springframework.stereotype.Component; import java.util.*; import java.util.stream.Collectors; +import static com.sdm.flowable.enums.ExecuteTypeEnum.LOCAL_APP; + /** * DTO → Flowable BpmnModel 映射工具类(核心) */ @@ -52,6 +54,9 @@ public class Dto2BpmnConverter { // 3.1、存储等待用户输入任务映射关系(原节点ID → waitUserTask节点ID) // 这里 Value 存的是 _waitUser 节点 ID,用于标识这个节点开启了等待用户操作 Map waitUserTaskMap = new HashMap<>(); // 原节点ID → waitUserTask节点ID + // 存储本地应用 UserTask 的映射关系 (OriginalId -> RegisterId) + // 用于将指向 UserTask 的连线重定向到 Register 节点 + Map localAppMap = new HashMap<>(); // 4. 存储并行网关映射关系(原节点ID → 网关ID) @@ -65,7 +70,7 @@ public class Dto2BpmnConverter { // 处理等待用户提交任务 handleWaitUserTask(process, nodeDto, waitUserTaskMap); // 创建实际节点 - createActualNode(process, nodeDto, asyncTaskMap); + createActualNode(process, nodeDto, asyncTaskMap,localAppMap); // 处理并行网关,创建拆分和汇聚节点 addRequiredGateways(process, nodeDto, flowDtos, joinGatewayMap, splitGatewayMap); } @@ -74,7 +79,7 @@ public class Dto2BpmnConverter { addRetryTask(process); // 6. 创建连线 - createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap); + createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap,localAppMap, joinGatewayMap, splitGatewayMap); validProcess(process); @@ -143,7 +148,7 @@ public class Dto2BpmnConverter { checkTask.setName(nodeDto.getName() + "结果校验"); checkTask.setAsynchronous(true); // 必须异步,产生 Job 用于报错 // 设置快速失败(1次重试,失败即死信),保证强一致性 - disableAsyncRetry(receiveTask); + disableAsyncRetry(checkTask); // 绑定校验 Delegate checkTask.setImplementation("${asyncResultCheckDelegate}"); checkTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); @@ -161,8 +166,7 @@ public class Dto2BpmnConverter { } private boolean isAsyncCallbackEnabled(FlowElementDTO nodeDto) { - return (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) || - FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) && + return FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) && nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null && nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback(); @@ -242,6 +246,7 @@ public class Dto2BpmnConverter { List flowDtos, Map asyncTaskMap, Map waitUserTaskMap, + Map localAppMap, Map joinGatewayMap, Map splitGatewayMap) { @@ -266,6 +271,11 @@ public class Dto2BpmnConverter { // (Prev -> WaitUser -> Original) // ==================================================================================== handleWaitUserTaskConnections(process, waitUserTaskMap); + + // ==================================================================================== + // 处理 LocalApp UserTask 的连线重定向 + // 逻辑:Prev -> Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask) + handleLocalAppConnections(process, localAppMap); } /** @@ -353,7 +363,7 @@ public class Dto2BpmnConverter { /** * 第三阶段:处理异步任务连接 - * 针对已标记为异步回调的任务节点,将其连接重构为' Original -> Wait -> Check -> Targets(NextNodes)'的模式 + * 针对已标记为异步回调的任务节点,将其连接 Original -> Targets(NextNodes) 重构为' Original -> Wait -> Check -> Targets(NextNodes)'的模式 */ private void handleAsyncTaskConnections(Process process, Map asyncTaskMap) { for (String originalNodeId : asyncTaskMap.keySet()) { @@ -390,6 +400,12 @@ public class Dto2BpmnConverter { } } + /** + * 第四阶段:处理等待用户任务连接 + * 重构等待用户任务的连线,将 Prev -> Original 处理成 Prev -> WaitUser -> Original + * @param process + * @param waitUserTaskMap + */ private void handleWaitUserTaskConnections(Process process, Map waitUserTaskMap) { for (String originalNodeId : waitUserTaskMap.keySet()) { String waitUserId = waitUserTaskMap.get(originalNodeId); @@ -408,21 +424,65 @@ public class Dto2BpmnConverter { } removeLines.forEach(f -> process.removeFlowElement(f.getId())); - // Step 2: 添加原来的入线 → waitUserTask + // Step 2: 添加新的入线 Prev → waitUserTask for (String src : originalSources) { process.addFlowElement(createSequenceFlow(src, waitUserId, null)); } - // Step 3: waitUserTask → 原节点 + // Step 3: 添加新的出线 waitUserTask → Original 原节点 process.addFlowElement(createSequenceFlow(waitUserId, originalNodeId, null)); } } + /** + * 第五阶段:处理本地应用任务连接 + * 前置生成节点处理中已经生成了 Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)的连线 + * 针对已标记为本地应用的任务节点,将其连接 prevNode--->Original Node 重构为'prevNode ---> Register Node(serviceTask) ->Wait Node(ReceiveTask) -> Check Node(ServiceTask)-> Original Node(UserTask)'的模式 + * @param process + * @param localAppMap + */ + private void handleLocalAppConnections(Process process, Map localAppMap) { + for (String originalId : localAppMap.keySet()) { + String registerId = localAppMap.get(originalId); + // 通过工具类获取 Check 节点 ID + String checkId = FlowNodeIdUtils.generateCheckTaskId(originalId); + + // ============================================================== + // 步骤 A: 处理入线 (prevNode -> Register) + // ============================================================== + + // 1. 找出所有指向 Original 的线 (此时全是来自上游的线) + List incomingFlows = new ArrayList<>(); + for (FlowElement ele : process.getFlowElements()) { + if (ele instanceof SequenceFlow) { + SequenceFlow sf = (SequenceFlow) ele; + if (sf.getTargetRef().equals(originalId)) { + // 【优化】这里不再需要判断 sourceRef 是否为 checkId 了 + // 因为我们在 createLocalAppUserTaskChain 里没画那条线 + incomingFlows.add(sf); + } + } + } + + // 2. 将这些线的终点,从 Original 改为 Register + for (SequenceFlow sf : incomingFlows) { + sf.setTargetRef(registerId); + } + + // ============================================================== + // 步骤 B: 缝合内部与原始节点 (Check -> Original) + // ============================================================== + // 3. 手动补充最后这一跳 + SequenceFlow checkToOriginal = createSequenceFlow(checkId, originalId, null); + process.addFlowElement(checkToOriginal); + } + } + /** * 创建实际的流程节点 */ - private void createActualNode(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) throws JsonProcessingException { + private void createActualNode(Process process, FlowElementDTO nodeDto, Map asyncTaskMap,Map localAppMap) throws JsonProcessingException { FlowElementTypeEnums elementType = FlowElementTypeEnums.fromString(nodeDto.getType()); switch (elementType) { @@ -456,7 +516,7 @@ public class Dto2BpmnConverter { break; case USERTASK: - createUserTask(process, nodeDto, asyncTaskMap); + createUserTask(process, nodeDto,localAppMap); break; case SERVICETASK: @@ -469,21 +529,84 @@ public class Dto2BpmnConverter { } } - private void createUserTask(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) throws JsonProcessingException { - // 用户任务:映射为 Flowable UserTask + private void createUserTask(Process process, FlowElementDTO nodeDto,Map localAppMap) throws JsonProcessingException { + //判断是否为本地应用类型 + if (FlowNodeIdUtils.isLocalAppNode(nodeDto)) { + createLocalAppUserTaskChain(process, nodeDto,localAppMap); + } else { + // 原有的普通 UserTask 创建逻辑 + createNormalUserTask(process, nodeDto); + } + } + + private void createLocalAppUserTaskChain(Process process, FlowElementDTO nodeDto,Map localAppMap) throws JsonProcessingException { + String originalId = nodeDto.getId(); + String registerId = FlowNodeIdUtils.generateRegisterTaskId(originalId); // _register + String waitId = FlowNodeIdUtils.generateAsyncTaskId(originalId); // _wait + String checkId = FlowNodeIdUtils.generateCheckTaskId(originalId); // _check + + // 1. Register Node (ServiceTask) + ServiceTask registerTask = new ServiceTask(); + registerTask.setId(registerId); + registerTask.setName("注册本地任务"); + registerTask.setAsynchronous(true); + disableAsyncRetry(registerTask); + registerTask.setImplementation("${localAppRegisterDelegate}"); + registerTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); + process.addFlowElement(registerTask); + + // 2. Wait Node (ReceiveTask) + ReceiveTask waitTask = new ReceiveTask(); + waitTask.setId(waitId); + waitTask.setName("等待应用回调"); + waitTask.setAsynchronous(true); + disableAsyncRetry(waitTask); + process.addFlowElement(waitTask); + + // 3. Check Node (ServiceTask) - 哨兵 + ServiceTask checkTask = new ServiceTask(); + checkTask.setId(checkId); + checkTask.setName("结果校验"); + checkTask.setAsynchronous(true); + disableAsyncRetry(checkTask); // 失败变红,进入死信 + checkTask.setImplementation("${asyncResultCheckDelegate}"); // 复用现有校验 + checkTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); + process.addFlowElement(checkTask); + + // 4. Original Node (UserTask) UserTask userTask = new UserTask(); - // 【关键修改】设置异步:防止下方绑定的监听器(创建文件夹)报错导致前一个节点回滚 - // 这会创建一个 Job 来执行 UserTask 的初始化逻辑(包括执行监听器) + userTask.setId(originalId); + userTask.setName(nodeDto.getName()); userTask.setAsynchronous(true); disableAsyncRetry(userTask); + // 保留原有的 UserTask 配置 (如目录准备监听器等) + addUserTaskExtensions(userTask, nodeDto); + process.addFlowElement(userTask); + + // 5. 建立内部连线: Register -> Wait -> Check -> Original + process.addFlowElement(createSequenceFlow(registerId, waitId, null)); + process.addFlowElement(createSequenceFlow(waitId, checkId, null)); + + // 6. 记录映射,用于后续将外部入线重定向到 Register + localAppMap.put(originalId, registerId); + } + + private void createNormalUserTask(Process process, FlowElementDTO nodeDto) throws JsonProcessingException { + UserTask userTask = new UserTask(); userTask.setId(nodeDto.getId()); userTask.setName(nodeDto.getName()); - log.info("创建用户任务节点 nodeId:{}", nodeDto.getId()); + userTask.setAsynchronous(true); + disableAsyncRetry(userTask); + addUserTaskExtensions(userTask, nodeDto); + process.addFlowElement(userTask); + } + + // 抽取通用的 UserTask 扩展属性设置 + private void addUserTaskExtensions(UserTask userTask, FlowElementDTO nodeDto) throws JsonProcessingException { + // 绑定控制参数(和 ServiceTask 类似) if (nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null) { BaseExecuteConfig userTaskExecuteConfig = nodeDto.getExtensionElements().getExecuteConfig(); - // 设置异步回调节点ID - userTaskExecuteConfig.setCallbackNodeId(asyncTaskMap.getOrDefault(nodeDto.getId(), null)); String configJson = objectMapper.writeValueAsString(userTaskExecuteConfig); log.info("用户任务userTask的executeConfig配置:{}", configJson); @@ -503,8 +626,6 @@ public class Dto2BpmnConverter { dirPrepareListener.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION); dirPrepareListener.setImplementation("${userTaskDirectoryPreparationListener}"); userTask.getExecutionListeners().add(dirPrepareListener); - - process.addFlowElement(userTask); } private void createServiceTask(Process process, FlowElementDTO nodeDto, Map asyncTaskMap) throws JsonProcessingException { diff --git a/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java b/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java index a94d7060..1d19639d 100644 --- a/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java +++ b/flowable/src/main/java/com/sdm/flowable/util/FlowNodeIdUtils.java @@ -1,6 +1,7 @@ package com.sdm.flowable.util; import com.sdm.common.config.FlowableConfig; +import com.sdm.common.entity.flowable.dto.FlowElementDTO; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -8,6 +9,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import static com.sdm.flowable.enums.ExecuteTypeEnum.LOCAL_APP; + @Slf4j public class FlowNodeIdUtils { private static final String JOIN_GATEWAY_PREFIX = FlowableConfig.JOIN_GATEWAY_PREFIX; @@ -15,6 +18,7 @@ public class FlowNodeIdUtils { private static final String ASYNC_TASK_SUFFIX = FlowableConfig.ASYNC_TASK_SUFFIX; private static final String WAIT_USER_SUFFIX = FlowableConfig.WAIT_USER_SUFFIX; private static final String CHECK_SUFFIX = FlowableConfig.CHECK_SUFFIX; // 后置哨兵 + private static final String REGISTER_SUFFIX = FlowableConfig.REGISTER_SUFFIX; // ==================== 网关 ==================== @@ -83,6 +87,22 @@ public class FlowNodeIdUtils { return checkTaskId.substring(0, checkTaskId.length() - CHECK_SUFFIX.length()); } + // ==================== 本地应用注册节点 ==================== + public static String generateRegisterTaskId(String nodeId) { + return nodeId + REGISTER_SUFFIX; + } + + public static boolean isRegisterTask(String id) { + return id != null && id.endsWith(REGISTER_SUFFIX); + } + + public static String getOriginalNodeIdFromRegisterTask(String registerTaskId) { + if (!isRegisterTask(registerTaskId)) { + throw new IllegalArgumentException("不是注册节点: " + registerTaskId); + } + return registerTaskId.substring(0, registerTaskId.length() - REGISTER_SUFFIX.length()); + } + // --- 解析器 (反向查找原始ID) --- /** @@ -99,6 +119,9 @@ public class FlowNodeIdUtils { if (nodeId.endsWith(CHECK_SUFFIX)) { return nodeId.substring(0, nodeId.length() - CHECK_SUFFIX.length()); } + if (nodeId.endsWith(REGISTER_SUFFIX)) { + return nodeId.substring(0, nodeId.length() - REGISTER_SUFFIX.length()); + } return nodeId; } // ==================== 重试任务 ==================== @@ -148,7 +171,15 @@ public class FlowNodeIdUtils { return !nodeId.endsWith(WAIT_USER_SUFFIX) && !nodeId.endsWith(ASYNC_TASK_SUFFIX) && !nodeId.endsWith(CHECK_SUFFIX) && + !nodeId.endsWith(REGISTER_SUFFIX)&& !nodeId.startsWith(JOIN_GATEWAY_PREFIX) && // 过滤网关 !nodeId.startsWith(SPLIT_GATEWAY_PREFIX); } + + public static boolean isLocalAppNode(FlowElementDTO nodeDto) { + if (nodeDto.getExtensionElements() != null && nodeDto.getExtensionElements().getExecuteConfig() != null) { + return LOCAL_APP.getCode().equalsIgnoreCase(nodeDto.getExtensionElements().getExecuteConfig().getExecuteType()); + } + return false; + } } \ No newline at end of file