这份 README 文档在原有的基础上进行了深度迭代,**完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑**,并详细阐述了最新的 ID 管理与状态聚合策略。 --- # SDM Flowable Workflow Module ## 1. 模块简介 本模块 (`com.sdm.flowable`) 是系统的工业仿真流程编排引擎核心。基于 **Flowable 7.0.1** 构建,摒弃了传统的静态 BPMN 文件模式,采用 **动态模型生成技术**。 它不仅支持**长耗时 HPC 异步任务**的“提交-等待-校验”闭环,还创新性地支持了**本地应用 UserTask** 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。 --- ## 2. 核心架构与模型转换 (`Dto2BpmnConverter`) 核心类 `Dto2BpmnConverter` 负责将前端定义的业务流程(DTO)转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略: ### 2.1 策略 A:HPC 任务后置分裂 (Sentinel Pattern) 针对 `ServiceTask`,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 **3 个连续节点**: 1. **Original Node (ServiceTask)**: 执行提交逻辑(如提交 HPC)。 2. **Wait Node (ReceiveTask)**: 命名为 `_wait`,挂起流程等待回调。 3. **Check Node (ServiceTask)**: 命名为 `_check`,哨兵校验。 > **链路**: `Original` $\rightarrow$ `_wait` $\rightarrow$ `_check` ### 2.2 策略 B:本地应用 UserTask 前置编排 (Pre-Orchestration) 针对 `UserTask`,若配置为 `localApp` 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点**前面**插入 3 个辅助节点: 1. **Register Node (ServiceTask)**: 命名为 `_register`。绑定 `LocalAppRegisterDelegate`,负责生成全局唯一的 `asyncTaskId` 并注册到业务表,同时存入流程 Local 变量。 2. **Wait Node (ReceiveTask)**: 命名为 `_wait`。挂起流程,等待本地应用运行结束的回调。 3. **Check Node (ServiceTask)**: 命名为 `_check`。哨兵校验本地应用的回调结果(成功/失败)。 4. **Original Node (UserTask)**: 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。 > **链路**: `_register` $\rightarrow$ `_wait` $\rightarrow$ `_check` $\rightarrow$ `Original` ### 2.3 人工介入控制 (Manual/Auto Switch) 针对普通 HPC 节点,`handleWaitUserTask` 方法默认在 `ServiceTask` 前插入 `_waitUser` (UserTask)。 * **Manual 模式**: 流程停在 `_waitUser`,等待用户确认。 * **Auto 模式**: 利用 SkipExpression 自动跳过 `_waitUser`。 --- ## 3. 异步任务、变量管理与状态聚合 ### 3.1 变量管理与 ID 获取 在本地应用场景中,`asyncTaskId` 的流转至关重要: 1. **生成与存储**: `_register` 节点生成 ID,通过 `asyncTaskRecordService.registerAsyncTask` 落库,同时 `execution.setVariableLocal` 存入流程。 2. **前端获取**: 前端轮询状态时,后端 **直接查询业务表 (`async_task_record`)** 获取最新的 `asyncTaskId`。 * *查询条件*: `processInstanceId` + `receiveTaskId` + `status=RUNNING` + `OrderByTimeDesc`。 * *优势*: 确保在“回退重试”场景下,永远获取到最新的业务 ID,避免流程变量残留旧值的问题。 ### 3.2 任务完成与强一致性 (`completeAsyncTask`) 复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 **三层校验**: 1. **实例校验**: 流程是否存在。 2. **挂起校验**: 若流程 **SUSPENDED**,仅更新数据库为 `SUCCESS`,**不触发流转**(回调缓冲)。激活时通过 `triggerPendingTasks` 自动补偿。 3. **节点校验**: 确保流程停在 `_wait`。 4. **触发**: 使用 `runtimeService.trigger` 唤醒流程。 ### 3.3 状态聚合逻辑 (`determineUnifiedState`) 针对分裂后的多物理节点,后端计算出唯一的**业务聚合状态**返给前端: | 场景 | 物理节点位置 | 聚合状态 | 前端展示/交互 | | :--- | :--- | :--- | :--- | | **HPC 运行中** | `Original` / `_wait` / `_check` | **ACTIVE** | 显示“正在计算” | | **本地应用运行中** | `_register` / `_wait` / `_check` | **ACTIVE** | 显示“拉起应用/运行中” | | **任务失败** | `_check` (死信) | **ERROR** | 显示红色错误,允许重试 | | **本地应用成功** | `Original` (UserTask) | **WAITING_FOR_USER** | 显示绿色,按钮变为“下一步” | | **流程挂起** | 任意节点 | **SUSPENDED** | 锁死操作 | --- ## 4. 核心时序交互图 ### 4.1 场景一:HPC 异步任务(含挂起与补偿) 展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。 ```mermaid sequenceDiagram autonumber actor User as 用户/前端 participant Ctrl as ProcessController participant Service as ProcessService participant Engine as Flowable Engine participant Delegate as UniversalDelegate participant DB as DB (AsyncRecord) participant Sentinel as AsyncResultCheckDelegate participant External as 外部系统(HPC) %% ========================================== %% 阶段一:异步提交 %% ========================================== rect rgb(240, 255, 240) note right of User: == 阶段一:HPC 异步提交 == note right of Engine: 流转至 Original Node (ServiceTask) Engine->>Delegate: execute() Delegate->>External: 提交HPC任务 Delegate->>DB: 插入记录 (Status=RUNNING) note right of Engine: 流转至 _wait Node (ReceiveTask) Engine->>Engine: 流程挂起 (Wait State) end %% ========================================== %% 阶段二:挂起与回调缓冲 %% ========================================== rect rgb(255, 240, 240) note right of User: == 阶段二:挂起与回调缓冲 == User->>Ctrl: suspendProcessInstance() Ctrl->>Engine: runtimeService.suspend() note right of Engine: 流程变为 SUSPENDED External->>Ctrl: asyncCallback (任务完成) Ctrl->>Service: asyncCallback() Service->>DB: 查询状态 alt 流程已挂起 DB->>DB: 仅更新 Status=SUCCESS
(不触发 trigger) note right of DB: 回调被缓冲,流程停在 _wait end end %% ========================================== %% 阶段三:激活与补偿 %% ========================================== rect rgb(240, 240, 255) note right of User: == 阶段三:激活与补偿 == User->>Ctrl: activateProcessInstance() Ctrl->>Engine: runtimeService.activate() Ctrl->>Service: triggerPendingTasks()
(激活后的自动动作) Service->>DB: 查询缓冲的成功记录 Service->>Engine: runtimeService.trigger()
(补偿触发) end %% ========================================== %% 阶段四:哨兵校验 %% ========================================== rect rgb(255, 255, 240) note right of User: == 阶段四:哨兵校验 == note right of Engine: 流转至 _check Node Engine->>Sentinel: execute() alt STATUS == FAIL Sentinel-->>Engine: 抛出异常 -> DeadLetterJob note right of Engine: 节点变红 (ERROR) end end ``` ### 4.2 场景二:本地应用 UserTask(交互闭环) 展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。 ```mermaid sequenceDiagram autonumber actor User as 用户 participant FE as 前端/插件 participant Ctrl as ProcessController participant Engine as Flowable Engine participant Reg as LocalAppRegisterDelegate participant DB as DB (AsyncRecord) participant Sentinel as AsyncResultCheckDelegate %% ========================================== %% 步骤 1: 自动注册与挂起 %% ========================================== rect rgb(230, 245, 255) note right of User: == 1. 自动注册与挂起 == Engine->>Reg: 进入 _register 节点 Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING) Reg->>Engine: setVariableLocal(asyncTaskId) Engine->>Engine: 流转至 _wait, 流程挂起 end %% ========================================== %% 步骤 2: 前端获取 ID 并拉起 %% ========================================== rect rgb(255, 250, 230) note right of User: == 2. 拉起应用 == FE->>Ctrl: getProcessDetail() Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录 Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE User->>FE: 点击"拉起应用" FE->>User: 唤起本地 EXE (传入 asyncTaskId) end %% ========================================== %% 步骤 3: 异步回调与校验 %% ========================================== rect rgb(240, 255, 240) note right of User: == 3. 回调与校验 == User->>FE: 应用运行结束 FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0) Ctrl->>DB: 更新 SUCCESS Ctrl->>Engine: runtimeService.trigger() Engine->>Sentinel: 进入 _check 校验 Sentinel->>Engine: 校验通过 end %% ========================================== %% 步骤 4: 人工确认 %% ========================================== rect rgb(240, 240, 240) note right of User: == 4. 人工确认 == Engine->>Engine: 流转至 Original UserTask FE->>Ctrl: getProcessDetail() Ctrl-->>FE: 状态 WAITING_FOR_USER User->>FE: 点击"下一步" FE->>Ctrl: /continueServiceTask Ctrl->>Engine: taskService.complete() end ``` --- ## 5. 附加功能交互流程 ### 5.1 原地重试 (Retry) 当哨兵校验失败(`_check` 变红)时,用户修复问题后可点击重试。主要用于 **HPC 任务** 或 **数据校验** 场景。 * **操作**: `/retryFailedNode` * **原理**: 将 `DeadLetterJob` 移回 `ExecutableJob`,重新触发校验逻辑。 ### 5.2 回退跳转 (Rewind/Jump) 适用于 **本地应用失败** 或 **参数填写错误** 场景。 * **操作**: `/retryToNode(targetNodeId)` * **原理**: 1. 用户点击“重新执行”。 2. 后端将流程指针强行跳转回 `_register`(针对本地应用)或任意前置节点。 3. `_register` 重新执行,**生成全新的 `asyncTaskId`**。 4. 流程重新挂起,前端获取新 ID,允许用户再次拉起应用。 --- ## 6. ProcessController 接口能力清单 | 方法 | 描述 | 逻辑细节 | | :--- | :--- | :--- | | `deploy` | 流程部署 | DTO -> BPMN 转换并部署 | | `saveParamsByDefinitionId` | 保存参数 | 保存用户输入的节点参数,作为运行模板 | | `startByProcessDefinitionId` | 启动流程 | 根据定义ID启动实例 | | `suspendProcessInstance` | 挂起实例 | 校验实例状态,挂起后阻止任务执行和回调触发 | | `activateProcessInstance` | 激活实例 | 激活流程,并立即调用 `triggerPendingTasks` **补偿触发**积压的回调 | | `cancelProcessInstance` | 取消实例 | 终止运行中的流程 | | `getProcessAndNodeDetailByInstanceId` | 状态查询 | 获取聚合状态;**若为本地应用且 Active,会查询业务表返回 asyncTaskId** | | `previewNodeInputFiles` | 文件预览 | 扫描 MinIO 或 本地磁盘,返回文件列表 | | `continueServiceTask` | 继续执行 | 完成 `_waitUser` 或 `Original UserTask`,推动流程 | | `asyncCallback` | 异步回调 | 通用回调接口,支持 HPC 和 本地应用 | | `retryFailedNode` | 原地重试 | 恢复死信作业 | | `retryToNode` | 回退重试 | 携带新参数,将流程跳转至任意指定节点(支持生成新业务ID) | 好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 **第 7 章节** 补充在最后。 --- ## 7. 核心机制与开发 Q&A ### Q1: 链路 `_waitUser` -> `Original` -> `_wait` -> `_check` 中,任意节点都可以挂起吗? **A: 是的,可以在任意节点挂起,但底层表现不同。** 所有节点在 Flowable 数据库中都有对应的持久化状态(Task、Job 或 Execution),挂起操作(Suspension)是针对流程实例级的,会冻结所有当前活跃的节点。 | 节点 | 类型 | 驱动方 | 挂起后的表现 | | :--- | :--- | :--- | :--- | | **`_waitUser`** | UserTask | **人工** | 用户点击“完成”时报错(拒绝操作)。 | | **`Original`** | ServiceTask (Async) | **引擎** | Job 停留在数据库,AsyncExecutor 会跳过执行,直到流程激活。 | | **`_wait`** | ReceiveTask | **外部** | 外部回调会被业务代码拦截并缓冲到 DB,**不触发流程流转**。 | | **`_check`** | ServiceTask (Async) | **引擎** | 同 `Original`,校验逻辑暂停执行。 | --- ### Q2: 为什么只有 `_wait` (ReceiveTask) 需要代码补偿 (`triggerPendingTasks`)? **A: 因为只有它是“事件驱动”且请求不可重放的。** * **`_wait` (ReceiveTask)**: 依赖外部系统的**一次性回调**。如果挂起期间回调到达,Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎**不知道**发生过回调,必须由代码主动查询数据库并补发 `trigger`。 * **`Original` / `_check`**: 依赖引擎的**轮询扫描**。流程激活后,AsyncExecutor 下次扫描时会自动发现并执行这些 Job,无需人工干预。 * **`_waitUser`**: 依赖**人工重试**。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。 --- ### Q3: `runtimeService.trigger` 是专门用来触发 ReceiveTask 的吗? **A: 在本系统的上下文中,是的。** 请严格区分以下 API 的用途: 1. **`runtimeService.trigger(executionId)`**: 专属用于 **`ReceiveTask`** (`_wait` 节点)。告诉引擎结束等待,继续流转。 2. **`taskService.complete(taskId)`**: 专属用于 **`UserTask`** (`_waitUser` 节点)。完成人工任务。 3. **`managementService.moveDeadLetterJob...`**: 专属用于 **死信 Job** (`_check` 失败后)。恢复异常任务。 > **注意**: 绝不能混用 `trigger` 和 `complete`。 --- ### Q4: Flowable 变量有哪几种?如何正确设置和获取? **A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。** #### 1. 变量类型对比 | 类型 | 作用域 (Scope) | 特点 | 适用场景 | | :--- | :--- | :--- | :--- | | **Global** | Process Instance | 全流程可见,同名覆盖 | 业务主键 (runId)、发起人信息 | | **Local** | Execution / Task | 仅当前节点可见,**数据隔离** | **异步任务 ID**、并行分支数据、临时状态 | #### 2. 代码操作指南 **设置变量 (Write):** ```java // [推荐] 使用 Local 防止并发分支数据污染 // 在 JavaDelegate 中 execution.setVariableLocal("asyncTaskId", id); // 在 Service 中 runtimeService.setVariableLocal(executionId, "status", "SUCCESS"); ``` **获取变量 (Read):** ```java // [推荐] 使用 getVariable (自动冒泡查找) // 优先找 Local,找不到找 Global。这样兼容性最好。 String val = (String) execution.getVariable("asyncTaskId"); ``` > **最佳实践**: 在 `_register` 节点存入 `asyncTaskId` 时使用 `setVariableLocal`。前端查询展示时,**建议直接查询业务表 (`async_task_record`)** 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。