Files
spdm-backend/flowable/README.md

330 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

这份 README 文档在原有的基础上进行了深度迭代,**完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑**,并详细阐述了最新的 ID 管理与状态聚合策略。
---
# SDM Flowable Workflow Module
## 1. 模块简介
本模块 (`com.sdm.flowable`) 是系统的工业仿真流程编排引擎核心。基于 **Flowable 7.0.1** 构建,摒弃了传统的静态 BPMN 文件模式,采用 **动态模型生成技术**
它不仅支持**长耗时 HPC 异步任务**的“提交-等待-校验”闭环,还创新性地支持了**本地应用 UserTask** 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。
---
## 2. 核心架构与模型转换 (`Dto2BpmnConverter`)
核心类 `Dto2BpmnConverter` 负责将前端定义的业务流程DTO转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略:
### 2.1 策略 AHPC 任务后置分裂 (Sentinel Pattern)
针对 `ServiceTask`,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 **3 个连续节点**
1. **Original Node (ServiceTask)**: 执行提交逻辑(如提交 HPC
2. **Wait Node (ReceiveTask)**: 命名为 `_wait`,挂起流程等待回调。
3. **Check Node (ServiceTask)**: 命名为 `_check`,哨兵校验。
> **链路**: `Original` $\rightarrow$ `_wait` $\rightarrow$ `_check`
### 2.2 策略 B本地应用 UserTask 前置编排 (Pre-Orchestration)
针对 `UserTask`,若配置为 `localApp` 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点**前面**插入 3 个辅助节点:
1. **Register Node (ServiceTask)**: 命名为 `_register`。绑定 `LocalAppRegisterDelegate`,负责生成全局唯一的 `asyncTaskId` 并注册到业务表,同时存入流程 Local 变量。
2. **Wait Node (ReceiveTask)**: 命名为 `_wait`。挂起流程,等待本地应用运行结束的回调。
3. **Check Node (ServiceTask)**: 命名为 `_check`。哨兵校验本地应用的回调结果(成功/失败)。
4. **Original Node (UserTask)**: 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。
> **链路**: `_register` $\rightarrow$ `_wait` $\rightarrow$ `_check` $\rightarrow$ `Original`
### 2.3 人工介入控制 (Manual/Auto Switch)
针对普通 HPC 节点,`handleWaitUserTask` 方法默认在 `ServiceTask` 前插入 `_waitUser` (UserTask)。
* **Manual 模式**: 流程停在 `_waitUser`,等待用户确认。
* **Auto 模式**: 利用 SkipExpression 自动跳过 `_waitUser`
---
## 3. 异步任务、变量管理与状态聚合
### 3.1 变量管理与 ID 获取
在本地应用场景中,`asyncTaskId` 的流转至关重要:
1. **生成与存储**: `_register` 节点生成 ID通过 `asyncTaskRecordService.registerAsyncTask` 落库,同时 `execution.setVariableLocal` 存入流程。
2. **前端获取**: 前端轮询状态时,后端 **直接查询业务表 (`async_task_record`)** 获取最新的 `asyncTaskId`
* *查询条件*: `processInstanceId` + `receiveTaskId` + `status=RUNNING` + `OrderByTimeDesc`
* *优势*: 确保在“回退重试”场景下,永远获取到最新的业务 ID避免流程变量残留旧值的问题。
### 3.2 任务完成与强一致性 (`completeAsyncTask`)
复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 **三层校验**
1. **实例校验**: 流程是否存在。
2. **挂起校验**: 若流程 **SUSPENDED**,仅更新数据库为 `SUCCESS`**不触发流转**(回调缓冲)。激活时通过 `triggerPendingTasks` 自动补偿。
3. **节点校验**: 确保流程停在 `_wait`
4. **触发**: 使用 `runtimeService.trigger` 唤醒流程。
### 3.3 状态聚合逻辑 (`determineUnifiedState`)
针对分裂后的多物理节点,后端计算出唯一的**业务聚合状态**返给前端:
| 场景 | 物理节点位置 | 聚合状态 | 前端展示/交互 |
| :--- | :--- | :--- | :--- |
| **HPC 运行中** | `Original` / `_wait` / `_check` | **ACTIVE** | 显示“正在计算” |
| **本地应用运行中** | `_register` / `_wait` / `_check` | **ACTIVE** | 显示“拉起应用/运行中” |
| **任务失败** | `_check` (死信) | **ERROR** | 显示红色错误,允许重试 |
| **本地应用成功** | `Original` (UserTask) | **WAITING_FOR_USER** | 显示绿色,按钮变为“下一步” |
| **流程挂起** | 任意节点 | **SUSPENDED** | 锁死操作 |
---
## 4. 核心时序交互图
### 4.1 场景一HPC 异步任务(含挂起与补偿)
展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。
```mermaid
sequenceDiagram
autonumber
actor User as 用户/前端
participant Ctrl as ProcessController
participant Service as ProcessService
participant Engine as Flowable Engine
participant Delegate as UniversalDelegate
participant DB as DB (AsyncRecord)
participant Sentinel as AsyncResultCheckDelegate
participant External as 外部系统(HPC)
%% ==========================================
%% 阶段一:异步提交
%% ==========================================
rect rgb(240, 255, 240)
note right of User: == 阶段一HPC 异步提交 ==
note right of Engine: 流转至 Original Node (ServiceTask)
Engine->>Delegate: execute()
Delegate->>External: 提交HPC任务
Delegate->>DB: 插入记录 (Status=RUNNING)
note right of Engine: 流转至 _wait Node (ReceiveTask)
Engine->>Engine: 流程挂起 (Wait State)
end
%% ==========================================
%% 阶段二:挂起与回调缓冲
%% ==========================================
rect rgb(255, 240, 240)
note right of User: == 阶段二:挂起与回调缓冲 ==
User->>Ctrl: suspendProcessInstance()
Ctrl->>Engine: runtimeService.suspend()
note right of Engine: 流程变为 SUSPENDED
External->>Ctrl: asyncCallback (任务完成)
Ctrl->>Service: asyncCallback()
Service->>DB: 查询状态
alt 流程已挂起
DB->>DB: 仅更新 Status=SUCCESS<br/>(不触发 trigger)
note right of DB: 回调被缓冲,流程停在 _wait
end
end
%% ==========================================
%% 阶段三:激活与补偿
%% ==========================================
rect rgb(240, 240, 255)
note right of User: == 阶段三:激活与补偿 ==
User->>Ctrl: activateProcessInstance()
Ctrl->>Engine: runtimeService.activate()
Ctrl->>Service: triggerPendingTasks() <br/>(激活后的自动动作)
Service->>DB: 查询缓冲的成功记录
Service->>Engine: runtimeService.trigger() <br/>(补偿触发)
end
%% ==========================================
%% 阶段四:哨兵校验
%% ==========================================
rect rgb(255, 255, 240)
note right of User: == 阶段四:哨兵校验 ==
note right of Engine: 流转至 _check Node
Engine->>Sentinel: execute()
alt STATUS == FAIL
Sentinel-->>Engine: 抛出异常 -> DeadLetterJob
note right of Engine: 节点变红 (ERROR)
end
end
```
### 4.2 场景二:本地应用 UserTask交互闭环
展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。
```mermaid
sequenceDiagram
autonumber
actor User as 用户
participant FE as 前端/插件
participant Ctrl as ProcessController
participant Engine as Flowable Engine
participant Reg as LocalAppRegisterDelegate
participant DB as DB (AsyncRecord)
participant Sentinel as AsyncResultCheckDelegate
%% ==========================================
%% 步骤 1: 自动注册与挂起
%% ==========================================
rect rgb(230, 245, 255)
note right of User: == 1. 自动注册与挂起 ==
Engine->>Reg: 进入 _register 节点
Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING)
Reg->>Engine: setVariableLocal(asyncTaskId)
Engine->>Engine: 流转至 _wait, 流程挂起
end
%% ==========================================
%% 步骤 2: 前端获取 ID 并拉起
%% ==========================================
rect rgb(255, 250, 230)
note right of User: == 2. 拉起应用 ==
FE->>Ctrl: getProcessDetail()
Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录
Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE
User->>FE: 点击"拉起应用"
FE->>User: 唤起本地 EXE (传入 asyncTaskId)
end
%% ==========================================
%% 步骤 3: 异步回调与校验
%% ==========================================
rect rgb(240, 255, 240)
note right of User: == 3. 回调与校验 ==
User->>FE: 应用运行结束
FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0)
Ctrl->>DB: 更新 SUCCESS
Ctrl->>Engine: runtimeService.trigger()
Engine->>Sentinel: 进入 _check 校验
Sentinel->>Engine: 校验通过
end
%% ==========================================
%% 步骤 4: 人工确认
%% ==========================================
rect rgb(240, 240, 240)
note right of User: == 4. 人工确认 ==
Engine->>Engine: 流转至 Original UserTask
FE->>Ctrl: getProcessDetail()
Ctrl-->>FE: 状态 WAITING_FOR_USER
User->>FE: 点击"下一步"
FE->>Ctrl: /continueServiceTask
Ctrl->>Engine: taskService.complete()
end
```
---
## 5. 附加功能交互流程
### 5.1 原地重试 (Retry)
当哨兵校验失败(`_check` 变红)时,用户修复问题后可点击重试。主要用于 **HPC 任务****数据校验** 场景。
* **操作**: `/retryFailedNode`
* **原理**: 将 `DeadLetterJob` 移回 `ExecutableJob`,重新触发校验逻辑。
### 5.2 回退跳转 (Rewind/Jump)
适用于 **本地应用失败****参数填写错误** 场景。
* **操作**: `/retryToNode(targetNodeId)`
* **原理**:
1. 用户点击“重新执行”。
2. 后端将流程指针强行跳转回 `_register`(针对本地应用)或任意前置节点。
3. `_register` 重新执行,**生成全新的 `asyncTaskId`**。
4. 流程重新挂起,前端获取新 ID允许用户再次拉起应用。
---
## 6. ProcessController 接口能力清单
| 方法 | 描述 | 逻辑细节 |
| :--- | :--- | :--- |
| `deploy` | 流程部署 | DTO -> BPMN 转换并部署 |
| `saveParamsByDefinitionId` | 保存参数 | 保存用户输入的节点参数,作为运行模板 |
| `startByProcessDefinitionId` | 启动流程 | 根据定义ID启动实例 |
| `suspendProcessInstance` | 挂起实例 | 校验实例状态,挂起后阻止任务执行和回调触发 |
| `activateProcessInstance` | 激活实例 | 激活流程,并立即调用 `triggerPendingTasks` **补偿触发**积压的回调 |
| `cancelProcessInstance` | 取消实例 | 终止运行中的流程 |
| `getProcessAndNodeDetailByInstanceId` | 状态查询 | 获取聚合状态;**若为本地应用且 Active会查询业务表返回 asyncTaskId** |
| `previewNodeInputFiles` | 文件预览 | 扫描 MinIO 或 本地磁盘,返回文件列表 |
| `continueServiceTask` | 继续执行 | 完成 `_waitUser``Original UserTask`,推动流程 |
| `asyncCallback` | 异步回调 | 通用回调接口,支持 HPC 和 本地应用 |
| `retryFailedNode` | 原地重试 | 恢复死信作业 |
| `retryToNode` | 回退重试 | 携带新参数将流程跳转至任意指定节点支持生成新业务ID |
好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 **第 7 章节** 补充在最后。
---
## 7. 核心机制与开发 Q&A
### Q1: 链路 `_waitUser` -> `Original` -> `_wait` -> `_check` 中,任意节点都可以挂起吗?
**A: 是的,可以在任意节点挂起,但底层表现不同。**
所有节点在 Flowable 数据库中都有对应的持久化状态Task、Job 或 Execution挂起操作Suspension是针对流程实例级的会冻结所有当前活跃的节点。
| 节点 | 类型 | 驱动方 | 挂起后的表现 |
| :--- | :--- | :--- | :--- |
| **`_waitUser`** | UserTask | **人工** | 用户点击“完成”时报错(拒绝操作)。 |
| **`Original`** | ServiceTask (Async) | **引擎** | Job 停留在数据库AsyncExecutor 会跳过执行,直到流程激活。 |
| **`_wait`** | ReceiveTask | **外部** | 外部回调会被业务代码拦截并缓冲到 DB**不触发流程流转**。 |
| **`_check`** | ServiceTask (Async) | **引擎** | 同 `Original`,校验逻辑暂停执行。 |
---
### Q2: 为什么只有 `_wait` (ReceiveTask) 需要代码补偿 (`triggerPendingTasks`)
**A: 因为只有它是“事件驱动”且请求不可重放的。**
* **`_wait` (ReceiveTask)**: 依赖外部系统的**一次性回调**。如果挂起期间回调到达Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎**不知道**发生过回调,必须由代码主动查询数据库并补发 `trigger`
* **`Original` / `_check`**: 依赖引擎的**轮询扫描**。流程激活后AsyncExecutor 下次扫描时会自动发现并执行这些 Job无需人工干预。
* **`_waitUser`**: 依赖**人工重试**。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。
---
### Q3: `runtimeService.trigger` 是专门用来触发 ReceiveTask 的吗?
**A: 在本系统的上下文中,是的。**
请严格区分以下 API 的用途:
1. **`runtimeService.trigger(executionId)`**: 专属用于 **`ReceiveTask`** (`_wait` 节点)。告诉引擎结束等待,继续流转。
2. **`taskService.complete(taskId)`**: 专属用于 **`UserTask`** (`_waitUser` 节点)。完成人工任务。
3. **`managementService.moveDeadLetterJob...`**: 专属用于 **死信 Job** (`_check` 失败后)。恢复异常任务。
> **注意**: 绝不能混用 `trigger` 和 `complete`。
---
### Q4: Flowable 变量有哪几种?如何正确设置和获取?
**A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。**
#### 1. 变量类型对比
| 类型 | 作用域 (Scope) | 特点 | 适用场景 |
| :--- | :--- | :--- | :--- |
| **Global** | Process Instance | 全流程可见,同名覆盖 | 业务主键 (runId)、发起人信息 |
| **Local** | Execution / Task | 仅当前节点可见,**数据隔离** | **异步任务 ID**、并行分支数据、临时状态 |
#### 2. 代码操作指南
**设置变量 (Write):**
```java
// [推荐] 使用 Local 防止并发分支数据污染
// 在 JavaDelegate 中
execution.setVariableLocal("asyncTaskId", id);
// 在 Service 中
runtimeService.setVariableLocal(executionId, "status", "SUCCESS");
```
**获取变量 (Read):**
```java
// [推荐] 使用 getVariable (自动冒泡查找)
// 优先找 Local找不到找 Global。这样兼容性最好。
String val = (String) execution.getVariable("asyncTaskId");
```
> **最佳实践**: 在 `_register` 节点存入 `asyncTaskId` 时使用 `setVariableLocal`。前端查询展示时,**建议直接查询业务表 (`async_task_record`)** 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。