330 lines
16 KiB
Markdown
330 lines
16 KiB
Markdown
这份 README 文档在原有的基础上进行了深度迭代,**完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑**,并详细阐述了最新的 ID 管理与状态聚合策略。
|
||
|
||
---
|
||
|
||
# SDM Flowable Workflow Module
|
||
|
||
## 1. 模块简介
|
||
|
||
本模块 (`com.sdm.flowable`) 是系统的工业仿真流程编排引擎核心。基于 **Flowable 7.0.1** 构建,摒弃了传统的静态 BPMN 文件模式,采用 **动态模型生成技术**。
|
||
|
||
它不仅支持**长耗时 HPC 异步任务**的“提交-等待-校验”闭环,还创新性地支持了**本地应用 UserTask** 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。
|
||
|
||
---
|
||
|
||
## 2. 核心架构与模型转换 (`Dto2BpmnConverter`)
|
||
|
||
核心类 `Dto2BpmnConverter` 负责将前端定义的业务流程(DTO)转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略:
|
||
|
||
### 2.1 策略 A:HPC 任务后置分裂 (Sentinel Pattern)
|
||
针对 `ServiceTask`,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 **3 个连续节点**:
|
||
|
||
1. **Original Node (ServiceTask)**: 执行提交逻辑(如提交 HPC)。
|
||
2. **Wait Node (ReceiveTask)**: 命名为 `_wait`,挂起流程等待回调。
|
||
3. **Check Node (ServiceTask)**: 命名为 `_check`,哨兵校验。
|
||
|
||
> **链路**: `Original` $\rightarrow$ `_wait` $\rightarrow$ `_check`
|
||
|
||
### 2.2 策略 B:本地应用 UserTask 前置编排 (Pre-Orchestration)
|
||
针对 `UserTask`,若配置为 `localApp` 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点**前面**插入 3 个辅助节点:
|
||
|
||
1. **Register Node (ServiceTask)**: 命名为 `_register`。绑定 `LocalAppRegisterDelegate`,负责生成全局唯一的 `asyncTaskId` 并注册到业务表,同时存入流程 Local 变量。
|
||
2. **Wait Node (ReceiveTask)**: 命名为 `_wait`。挂起流程,等待本地应用运行结束的回调。
|
||
3. **Check Node (ServiceTask)**: 命名为 `_check`。哨兵校验本地应用的回调结果(成功/失败)。
|
||
4. **Original Node (UserTask)**: 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。
|
||
|
||
> **链路**: `_register` $\rightarrow$ `_wait` $\rightarrow$ `_check` $\rightarrow$ `Original`
|
||
|
||
### 2.3 人工介入控制 (Manual/Auto Switch)
|
||
针对普通 HPC 节点,`handleWaitUserTask` 方法默认在 `ServiceTask` 前插入 `_waitUser` (UserTask)。
|
||
* **Manual 模式**: 流程停在 `_waitUser`,等待用户确认。
|
||
* **Auto 模式**: 利用 SkipExpression 自动跳过 `_waitUser`。
|
||
|
||
---
|
||
|
||
## 3. 异步任务、变量管理与状态聚合
|
||
|
||
### 3.1 变量管理与 ID 获取
|
||
在本地应用场景中,`asyncTaskId` 的流转至关重要:
|
||
1. **生成与存储**: `_register` 节点生成 ID,通过 `asyncTaskRecordService.registerAsyncTask` 落库,同时 `execution.setVariableLocal` 存入流程。
|
||
2. **前端获取**: 前端轮询状态时,后端 **直接查询业务表 (`async_task_record`)** 获取最新的 `asyncTaskId`。
|
||
* *查询条件*: `processInstanceId` + `receiveTaskId` + `status=RUNNING` + `OrderByTimeDesc`。
|
||
* *优势*: 确保在“回退重试”场景下,永远获取到最新的业务 ID,避免流程变量残留旧值的问题。
|
||
|
||
### 3.2 任务完成与强一致性 (`completeAsyncTask`)
|
||
复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 **三层校验**:
|
||
1. **实例校验**: 流程是否存在。
|
||
2. **挂起校验**: 若流程 **SUSPENDED**,仅更新数据库为 `SUCCESS`,**不触发流转**(回调缓冲)。激活时通过 `triggerPendingTasks` 自动补偿。
|
||
3. **节点校验**: 确保流程停在 `_wait`。
|
||
4. **触发**: 使用 `runtimeService.trigger` 唤醒流程。
|
||
|
||
### 3.3 状态聚合逻辑 (`determineUnifiedState`)
|
||
针对分裂后的多物理节点,后端计算出唯一的**业务聚合状态**返给前端:
|
||
|
||
| 场景 | 物理节点位置 | 聚合状态 | 前端展示/交互 |
|
||
| :--- | :--- | :--- | :--- |
|
||
| **HPC 运行中** | `Original` / `_wait` / `_check` | **ACTIVE** | 显示“正在计算” |
|
||
| **本地应用运行中** | `_register` / `_wait` / `_check` | **ACTIVE** | 显示“拉起应用/运行中” |
|
||
| **任务失败** | `_check` (死信) | **ERROR** | 显示红色错误,允许重试 |
|
||
| **本地应用成功** | `Original` (UserTask) | **WAITING_FOR_USER** | 显示绿色,按钮变为“下一步” |
|
||
| **流程挂起** | 任意节点 | **SUSPENDED** | 锁死操作 |
|
||
|
||
---
|
||
|
||
## 4. 核心时序交互图
|
||
|
||
### 4.1 场景一:HPC 异步任务(含挂起与补偿)
|
||
展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
autonumber
|
||
actor User as 用户/前端
|
||
participant Ctrl as ProcessController
|
||
participant Service as ProcessService
|
||
participant Engine as Flowable Engine
|
||
participant Delegate as UniversalDelegate
|
||
participant DB as DB (AsyncRecord)
|
||
participant Sentinel as AsyncResultCheckDelegate
|
||
participant External as 外部系统(HPC)
|
||
|
||
%% ==========================================
|
||
%% 阶段一:异步提交
|
||
%% ==========================================
|
||
rect rgb(240, 255, 240)
|
||
note right of User: == 阶段一:HPC 异步提交 ==
|
||
note right of Engine: 流转至 Original Node (ServiceTask)
|
||
Engine->>Delegate: execute()
|
||
Delegate->>External: 提交HPC任务
|
||
Delegate->>DB: 插入记录 (Status=RUNNING)
|
||
note right of Engine: 流转至 _wait Node (ReceiveTask)
|
||
Engine->>Engine: 流程挂起 (Wait State)
|
||
end
|
||
|
||
%% ==========================================
|
||
%% 阶段二:挂起与回调缓冲
|
||
%% ==========================================
|
||
rect rgb(255, 240, 240)
|
||
note right of User: == 阶段二:挂起与回调缓冲 ==
|
||
User->>Ctrl: suspendProcessInstance()
|
||
Ctrl->>Engine: runtimeService.suspend()
|
||
note right of Engine: 流程变为 SUSPENDED
|
||
|
||
External->>Ctrl: asyncCallback (任务完成)
|
||
Ctrl->>Service: asyncCallback()
|
||
Service->>DB: 查询状态
|
||
|
||
alt 流程已挂起
|
||
DB->>DB: 仅更新 Status=SUCCESS<br/>(不触发 trigger)
|
||
note right of DB: 回调被缓冲,流程停在 _wait
|
||
end
|
||
end
|
||
|
||
%% ==========================================
|
||
%% 阶段三:激活与补偿
|
||
%% ==========================================
|
||
rect rgb(240, 240, 255)
|
||
note right of User: == 阶段三:激活与补偿 ==
|
||
User->>Ctrl: activateProcessInstance()
|
||
Ctrl->>Engine: runtimeService.activate()
|
||
|
||
Ctrl->>Service: triggerPendingTasks() <br/>(激活后的自动动作)
|
||
Service->>DB: 查询缓冲的成功记录
|
||
Service->>Engine: runtimeService.trigger() <br/>(补偿触发)
|
||
end
|
||
|
||
%% ==========================================
|
||
%% 阶段四:哨兵校验
|
||
%% ==========================================
|
||
rect rgb(255, 255, 240)
|
||
note right of User: == 阶段四:哨兵校验 ==
|
||
note right of Engine: 流转至 _check Node
|
||
Engine->>Sentinel: execute()
|
||
|
||
alt STATUS == FAIL
|
||
Sentinel-->>Engine: 抛出异常 -> DeadLetterJob
|
||
note right of Engine: 节点变红 (ERROR)
|
||
end
|
||
end
|
||
```
|
||
|
||
### 4.2 场景二:本地应用 UserTask(交互闭环)
|
||
展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
autonumber
|
||
actor User as 用户
|
||
participant FE as 前端/插件
|
||
participant Ctrl as ProcessController
|
||
participant Engine as Flowable Engine
|
||
participant Reg as LocalAppRegisterDelegate
|
||
participant DB as DB (AsyncRecord)
|
||
participant Sentinel as AsyncResultCheckDelegate
|
||
|
||
%% ==========================================
|
||
%% 步骤 1: 自动注册与挂起
|
||
%% ==========================================
|
||
rect rgb(230, 245, 255)
|
||
note right of User: == 1. 自动注册与挂起 ==
|
||
Engine->>Reg: 进入 _register 节点
|
||
Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING)
|
||
Reg->>Engine: setVariableLocal(asyncTaskId)
|
||
Engine->>Engine: 流转至 _wait, 流程挂起
|
||
end
|
||
|
||
%% ==========================================
|
||
%% 步骤 2: 前端获取 ID 并拉起
|
||
%% ==========================================
|
||
rect rgb(255, 250, 230)
|
||
note right of User: == 2. 拉起应用 ==
|
||
FE->>Ctrl: getProcessDetail()
|
||
Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录
|
||
Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE
|
||
|
||
User->>FE: 点击"拉起应用"
|
||
FE->>User: 唤起本地 EXE (传入 asyncTaskId)
|
||
end
|
||
|
||
%% ==========================================
|
||
%% 步骤 3: 异步回调与校验
|
||
%% ==========================================
|
||
rect rgb(240, 255, 240)
|
||
note right of User: == 3. 回调与校验 ==
|
||
User->>FE: 应用运行结束
|
||
FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0)
|
||
Ctrl->>DB: 更新 SUCCESS
|
||
Ctrl->>Engine: runtimeService.trigger()
|
||
|
||
Engine->>Sentinel: 进入 _check 校验
|
||
Sentinel->>Engine: 校验通过
|
||
end
|
||
|
||
%% ==========================================
|
||
%% 步骤 4: 人工确认
|
||
%% ==========================================
|
||
rect rgb(240, 240, 240)
|
||
note right of User: == 4. 人工确认 ==
|
||
Engine->>Engine: 流转至 Original UserTask
|
||
|
||
FE->>Ctrl: getProcessDetail()
|
||
Ctrl-->>FE: 状态 WAITING_FOR_USER
|
||
|
||
User->>FE: 点击"下一步"
|
||
FE->>Ctrl: /continueServiceTask
|
||
Ctrl->>Engine: taskService.complete()
|
||
end
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 附加功能交互流程
|
||
|
||
### 5.1 原地重试 (Retry)
|
||
当哨兵校验失败(`_check` 变红)时,用户修复问题后可点击重试。主要用于 **HPC 任务** 或 **数据校验** 场景。
|
||
* **操作**: `/retryFailedNode`
|
||
* **原理**: 将 `DeadLetterJob` 移回 `ExecutableJob`,重新触发校验逻辑。
|
||
|
||
### 5.2 回退跳转 (Rewind/Jump)
|
||
适用于 **本地应用失败** 或 **参数填写错误** 场景。
|
||
* **操作**: `/retryToNode(targetNodeId)`
|
||
* **原理**:
|
||
1. 用户点击“重新执行”。
|
||
2. 后端将流程指针强行跳转回 `_register`(针对本地应用)或任意前置节点。
|
||
3. `_register` 重新执行,**生成全新的 `asyncTaskId`**。
|
||
4. 流程重新挂起,前端获取新 ID,允许用户再次拉起应用。
|
||
|
||
---
|
||
|
||
## 6. ProcessController 接口能力清单
|
||
|
||
| 方法 | 描述 | 逻辑细节 |
|
||
| :--- | :--- | :--- |
|
||
| `deploy` | 流程部署 | DTO -> BPMN 转换并部署 |
|
||
| `saveParamsByDefinitionId` | 保存参数 | 保存用户输入的节点参数,作为运行模板 |
|
||
| `startByProcessDefinitionId` | 启动流程 | 根据定义ID启动实例 |
|
||
| `suspendProcessInstance` | 挂起实例 | 校验实例状态,挂起后阻止任务执行和回调触发 |
|
||
| `activateProcessInstance` | 激活实例 | 激活流程,并立即调用 `triggerPendingTasks` **补偿触发**积压的回调 |
|
||
| `cancelProcessInstance` | 取消实例 | 终止运行中的流程 |
|
||
| `getProcessAndNodeDetailByInstanceId` | 状态查询 | 获取聚合状态;**若为本地应用且 Active,会查询业务表返回 asyncTaskId** |
|
||
| `previewNodeInputFiles` | 文件预览 | 扫描 MinIO 或 本地磁盘,返回文件列表 |
|
||
| `continueServiceTask` | 继续执行 | 完成 `_waitUser` 或 `Original UserTask`,推动流程 |
|
||
| `asyncCallback` | 异步回调 | 通用回调接口,支持 HPC 和 本地应用 |
|
||
| `retryFailedNode` | 原地重试 | 恢复死信作业 |
|
||
| `retryToNode` | 回退重试 | 携带新参数,将流程跳转至任意指定节点(支持生成新业务ID) |
|
||
|
||
好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 **第 7 章节** 补充在最后。
|
||
|
||
---
|
||
|
||
## 7. 核心机制与开发 Q&A
|
||
|
||
### Q1: 链路 `_waitUser` -> `Original` -> `_wait` -> `_check` 中,任意节点都可以挂起吗?
|
||
|
||
**A: 是的,可以在任意节点挂起,但底层表现不同。**
|
||
|
||
所有节点在 Flowable 数据库中都有对应的持久化状态(Task、Job 或 Execution),挂起操作(Suspension)是针对流程实例级的,会冻结所有当前活跃的节点。
|
||
|
||
| 节点 | 类型 | 驱动方 | 挂起后的表现 |
|
||
| :--- | :--- | :--- | :--- |
|
||
| **`_waitUser`** | UserTask | **人工** | 用户点击“完成”时报错(拒绝操作)。 |
|
||
| **`Original`** | ServiceTask (Async) | **引擎** | Job 停留在数据库,AsyncExecutor 会跳过执行,直到流程激活。 |
|
||
| **`_wait`** | ReceiveTask | **外部** | 外部回调会被业务代码拦截并缓冲到 DB,**不触发流程流转**。 |
|
||
| **`_check`** | ServiceTask (Async) | **引擎** | 同 `Original`,校验逻辑暂停执行。 |
|
||
|
||
---
|
||
|
||
### Q2: 为什么只有 `_wait` (ReceiveTask) 需要代码补偿 (`triggerPendingTasks`)?
|
||
|
||
**A: 因为只有它是“事件驱动”且请求不可重放的。**
|
||
|
||
* **`_wait` (ReceiveTask)**: 依赖外部系统的**一次性回调**。如果挂起期间回调到达,Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎**不知道**发生过回调,必须由代码主动查询数据库并补发 `trigger`。
|
||
* **`Original` / `_check`**: 依赖引擎的**轮询扫描**。流程激活后,AsyncExecutor 下次扫描时会自动发现并执行这些 Job,无需人工干预。
|
||
* **`_waitUser`**: 依赖**人工重试**。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。
|
||
|
||
---
|
||
|
||
### Q3: `runtimeService.trigger` 是专门用来触发 ReceiveTask 的吗?
|
||
|
||
**A: 在本系统的上下文中,是的。**
|
||
|
||
请严格区分以下 API 的用途:
|
||
|
||
1. **`runtimeService.trigger(executionId)`**: 专属用于 **`ReceiveTask`** (`_wait` 节点)。告诉引擎结束等待,继续流转。
|
||
2. **`taskService.complete(taskId)`**: 专属用于 **`UserTask`** (`_waitUser` 节点)。完成人工任务。
|
||
3. **`managementService.moveDeadLetterJob...`**: 专属用于 **死信 Job** (`_check` 失败后)。恢复异常任务。
|
||
|
||
> **注意**: 绝不能混用 `trigger` 和 `complete`。
|
||
|
||
---
|
||
|
||
### Q4: Flowable 变量有哪几种?如何正确设置和获取?
|
||
|
||
**A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。**
|
||
|
||
#### 1. 变量类型对比
|
||
|
||
| 类型 | 作用域 (Scope) | 特点 | 适用场景 |
|
||
| :--- | :--- | :--- | :--- |
|
||
| **Global** | Process Instance | 全流程可见,同名覆盖 | 业务主键 (runId)、发起人信息 |
|
||
| **Local** | Execution / Task | 仅当前节点可见,**数据隔离** | **异步任务 ID**、并行分支数据、临时状态 |
|
||
|
||
#### 2. 代码操作指南
|
||
|
||
**设置变量 (Write):**
|
||
```java
|
||
// [推荐] 使用 Local 防止并发分支数据污染
|
||
// 在 JavaDelegate 中
|
||
execution.setVariableLocal("asyncTaskId", id);
|
||
// 在 Service 中
|
||
runtimeService.setVariableLocal(executionId, "status", "SUCCESS");
|
||
```
|
||
|
||
**获取变量 (Read):**
|
||
```java
|
||
// [推荐] 使用 getVariable (自动冒泡查找)
|
||
// 优先找 Local,找不到找 Global。这样兼容性最好。
|
||
String val = (String) execution.getVariable("asyncTaskId");
|
||
```
|
||
|
||
> **最佳实践**: 在 `_register` 节点存入 `asyncTaskId` 时使用 `setVariableLocal`。前端查询展示时,**建议直接查询业务表 (`async_task_record`)** 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。 |