这份 README 文档在原有的基础上进行了深度迭代,完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑,并详细阐述了最新的 ID 管理与状态聚合策略。
SDM Flowable Workflow Module
1. 模块简介
本模块 (com.sdm.flowable) 是系统的工业仿真流程编排引擎核心。基于 Flowable 7.0.1 构建,摒弃了传统的静态 BPMN 文件模式,采用 动态模型生成技术。
它不仅支持长耗时 HPC 异步任务的“提交-等待-校验”闭环,还创新性地支持了本地应用 UserTask 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。
2. 核心架构与模型转换 (Dto2BpmnConverter)
核心类 Dto2BpmnConverter 负责将前端定义的业务流程(DTO)转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略:
2.1 策略 A:HPC 任务后置分裂 (Sentinel Pattern)
针对 ServiceTask,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 3 个连续节点:
- Original Node (ServiceTask): 执行提交逻辑(如提交 HPC)。
- Wait Node (ReceiveTask): 命名为
_wait,挂起流程等待回调。 - Check Node (ServiceTask): 命名为
_check,哨兵校验。
链路:
Original\rightarrow_wait\rightarrow_check
2.2 策略 B:本地应用 UserTask 前置编排 (Pre-Orchestration)
针对 UserTask,若配置为 localApp 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点前面插入 3 个辅助节点:
- Register Node (ServiceTask): 命名为
_register。绑定LocalAppRegisterDelegate,负责生成全局唯一的asyncTaskId并注册到业务表,同时存入流程 Local 变量。 - Wait Node (ReceiveTask): 命名为
_wait。挂起流程,等待本地应用运行结束的回调。 - Check Node (ServiceTask): 命名为
_check。哨兵校验本地应用的回调结果(成功/失败)。 - Original Node (UserTask): 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。
链路:
_register\rightarrow_wait\rightarrow_check\rightarrowOriginal
2.3 人工介入控制 (Manual/Auto Switch)
针对普通 HPC 节点,handleWaitUserTask 方法默认在 ServiceTask 前插入 _waitUser (UserTask)。
- Manual 模式: 流程停在
_waitUser,等待用户确认。 - Auto 模式: 利用 SkipExpression 自动跳过
_waitUser。
3. 异步任务、变量管理与状态聚合
3.1 变量管理与 ID 获取
在本地应用场景中,asyncTaskId 的流转至关重要:
- 生成与存储:
_register节点生成 ID,通过asyncTaskRecordService.registerAsyncTask落库,同时execution.setVariableLocal存入流程。 - 前端获取: 前端轮询状态时,后端 直接查询业务表 (
async_task_record) 获取最新的asyncTaskId。- 查询条件:
processInstanceId+receiveTaskId+status=RUNNING+OrderByTimeDesc。 - 优势: 确保在“回退重试”场景下,永远获取到最新的业务 ID,避免流程变量残留旧值的问题。
- 查询条件:
3.2 任务完成与强一致性 (completeAsyncTask)
复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 三层校验:
- 实例校验: 流程是否存在。
- 挂起校验: 若流程 SUSPENDED,仅更新数据库为
SUCCESS,不触发流转(回调缓冲)。激活时通过triggerPendingTasks自动补偿。 - 节点校验: 确保流程停在
_wait。 - 触发: 使用
runtimeService.trigger唤醒流程。
3.3 状态聚合逻辑 (determineUnifiedState)
针对分裂后的多物理节点,后端计算出唯一的业务聚合状态返给前端:
| 场景 | 物理节点位置 | 聚合状态 | 前端展示/交互 |
|---|---|---|---|
| HPC 运行中 | Original / _wait / _check |
ACTIVE | 显示“正在计算” |
| 本地应用运行中 | _register / _wait / _check |
ACTIVE | 显示“拉起应用/运行中” |
| 任务失败 | _check (死信) |
ERROR | 显示红色错误,允许重试 |
| 本地应用成功 | Original (UserTask) |
WAITING_FOR_USER | 显示绿色,按钮变为“下一步” |
| 流程挂起 | 任意节点 | SUSPENDED | 锁死操作 |
4. 核心时序交互图
4.1 场景一:HPC 异步任务(含挂起与补偿)
展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。
sequenceDiagram
autonumber
actor User as 用户/前端
participant Ctrl as ProcessController
participant Service as ProcessService
participant Engine as Flowable Engine
participant Delegate as UniversalDelegate
participant DB as DB (AsyncRecord)
participant Sentinel as AsyncResultCheckDelegate
participant External as 外部系统(HPC)
%% ==========================================
%% 阶段一:异步提交
%% ==========================================
rect rgb(240, 255, 240)
note right of User: == 阶段一:HPC 异步提交 ==
note right of Engine: 流转至 Original Node (ServiceTask)
Engine->>Delegate: execute()
Delegate->>External: 提交HPC任务
Delegate->>DB: 插入记录 (Status=RUNNING)
note right of Engine: 流转至 _wait Node (ReceiveTask)
Engine->>Engine: 流程挂起 (Wait State)
end
%% ==========================================
%% 阶段二:挂起与回调缓冲
%% ==========================================
rect rgb(255, 240, 240)
note right of User: == 阶段二:挂起与回调缓冲 ==
User->>Ctrl: suspendProcessInstance()
Ctrl->>Engine: runtimeService.suspend()
note right of Engine: 流程变为 SUSPENDED
External->>Ctrl: asyncCallback (任务完成)
Ctrl->>Service: asyncCallback()
Service->>DB: 查询状态
alt 流程已挂起
DB->>DB: 仅更新 Status=SUCCESS<br/>(不触发 trigger)
note right of DB: 回调被缓冲,流程停在 _wait
end
end
%% ==========================================
%% 阶段三:激活与补偿
%% ==========================================
rect rgb(240, 240, 255)
note right of User: == 阶段三:激活与补偿 ==
User->>Ctrl: activateProcessInstance()
Ctrl->>Engine: runtimeService.activate()
Ctrl->>Service: triggerPendingTasks() <br/>(激活后的自动动作)
Service->>DB: 查询缓冲的成功记录
Service->>Engine: runtimeService.trigger() <br/>(补偿触发)
end
%% ==========================================
%% 阶段四:哨兵校验
%% ==========================================
rect rgb(255, 255, 240)
note right of User: == 阶段四:哨兵校验 ==
note right of Engine: 流转至 _check Node
Engine->>Sentinel: execute()
alt STATUS == FAIL
Sentinel-->>Engine: 抛出异常 -> DeadLetterJob
note right of Engine: 节点变红 (ERROR)
end
end
4.2 场景二:本地应用 UserTask(交互闭环)
展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。
sequenceDiagram
autonumber
actor User as 用户
participant FE as 前端/插件
participant Ctrl as ProcessController
participant Engine as Flowable Engine
participant Reg as LocalAppRegisterDelegate
participant DB as DB (AsyncRecord)
participant Sentinel as AsyncResultCheckDelegate
%% ==========================================
%% 步骤 1: 自动注册与挂起
%% ==========================================
rect rgb(230, 245, 255)
note right of User: == 1. 自动注册与挂起 ==
Engine->>Reg: 进入 _register 节点
Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING)
Reg->>Engine: setVariableLocal(asyncTaskId)
Engine->>Engine: 流转至 _wait, 流程挂起
end
%% ==========================================
%% 步骤 2: 前端获取 ID 并拉起
%% ==========================================
rect rgb(255, 250, 230)
note right of User: == 2. 拉起应用 ==
FE->>Ctrl: getProcessDetail()
Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录
Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE
User->>FE: 点击"拉起应用"
FE->>User: 唤起本地 EXE (传入 asyncTaskId)
end
%% ==========================================
%% 步骤 3: 异步回调与校验
%% ==========================================
rect rgb(240, 255, 240)
note right of User: == 3. 回调与校验 ==
User->>FE: 应用运行结束
FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0)
Ctrl->>DB: 更新 SUCCESS
Ctrl->>Engine: runtimeService.trigger()
Engine->>Sentinel: 进入 _check 校验
Sentinel->>Engine: 校验通过
end
%% ==========================================
%% 步骤 4: 人工确认
%% ==========================================
rect rgb(240, 240, 240)
note right of User: == 4. 人工确认 ==
Engine->>Engine: 流转至 Original UserTask
FE->>Ctrl: getProcessDetail()
Ctrl-->>FE: 状态 WAITING_FOR_USER
User->>FE: 点击"下一步"
FE->>Ctrl: /continueServiceTask
Ctrl->>Engine: taskService.complete()
end
5. 附加功能交互流程
5.1 原地重试 (Retry)
当哨兵校验失败(_check 变红)时,用户修复问题后可点击重试。主要用于 HPC 任务 或 数据校验 场景。
- 操作:
/retryFailedNode - 原理: 将
DeadLetterJob移回ExecutableJob,重新触发校验逻辑。
5.2 回退跳转 (Rewind/Jump)
适用于 本地应用失败 或 参数填写错误 场景。
- 操作:
/retryToNode(targetNodeId) - 原理:
- 用户点击“重新执行”。
- 后端将流程指针强行跳转回
_register(针对本地应用)或任意前置节点。 _register重新执行,生成全新的asyncTaskId。- 流程重新挂起,前端获取新 ID,允许用户再次拉起应用。
6. ProcessController 接口能力清单
| 方法 | 描述 | 逻辑细节 |
|---|---|---|
deploy |
流程部署 | DTO -> BPMN 转换并部署 |
saveParamsByDefinitionId |
保存参数 | 保存用户输入的节点参数,作为运行模板 |
startByProcessDefinitionId |
启动流程 | 根据定义ID启动实例 |
suspendProcessInstance |
挂起实例 | 校验实例状态,挂起后阻止任务执行和回调触发 |
activateProcessInstance |
激活实例 | 激活流程,并立即调用 triggerPendingTasks 补偿触发积压的回调 |
cancelProcessInstance |
取消实例 | 终止运行中的流程 |
getProcessAndNodeDetailByInstanceId |
状态查询 | 获取聚合状态;若为本地应用且 Active,会查询业务表返回 asyncTaskId |
previewNodeInputFiles |
文件预览 | 扫描 MinIO 或 本地磁盘,返回文件列表 |
continueServiceTask |
继续执行 | 完成 _waitUser 或 Original UserTask,推动流程 |
asyncCallback |
异步回调 | 通用回调接口,支持 HPC 和 本地应用 |
retryFailedNode |
原地重试 | 恢复死信作业 |
retryToNode |
回退重试 | 携带新参数,将流程跳转至任意指定节点(支持生成新业务ID) |
好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 第 7 章节 补充在最后。
7. 核心机制与开发 Q&A
Q1: 链路 _waitUser -> Original -> _wait -> _check 中,任意节点都可以挂起吗?
A: 是的,可以在任意节点挂起,但底层表现不同。
所有节点在 Flowable 数据库中都有对应的持久化状态(Task、Job 或 Execution),挂起操作(Suspension)是针对流程实例级的,会冻结所有当前活跃的节点。
| 节点 | 类型 | 驱动方 | 挂起后的表现 |
|---|---|---|---|
_waitUser |
UserTask | 人工 | 用户点击“完成”时报错(拒绝操作)。 |
Original |
ServiceTask (Async) | 引擎 | Job 停留在数据库,AsyncExecutor 会跳过执行,直到流程激活。 |
_wait |
ReceiveTask | 外部 | 外部回调会被业务代码拦截并缓冲到 DB,不触发流程流转。 |
_check |
ServiceTask (Async) | 引擎 | 同 Original,校验逻辑暂停执行。 |
Q2: 为什么只有 _wait (ReceiveTask) 需要代码补偿 (triggerPendingTasks)?
A: 因为只有它是“事件驱动”且请求不可重放的。
_wait(ReceiveTask): 依赖外部系统的一次性回调。如果挂起期间回调到达,Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎不知道发生过回调,必须由代码主动查询数据库并补发trigger。Original/_check: 依赖引擎的轮询扫描。流程激活后,AsyncExecutor 下次扫描时会自动发现并执行这些 Job,无需人工干预。_waitUser: 依赖人工重试。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。
Q3: runtimeService.trigger 是专门用来触发 ReceiveTask 的吗?
A: 在本系统的上下文中,是的。
请严格区分以下 API 的用途:
runtimeService.trigger(executionId): 专属用于ReceiveTask(_wait节点)。告诉引擎结束等待,继续流转。taskService.complete(taskId): 专属用于UserTask(_waitUser节点)。完成人工任务。managementService.moveDeadLetterJob...: 专属用于 死信 Job (_check失败后)。恢复异常任务。
注意: 绝不能混用
trigger和complete。
Q4: Flowable 变量有哪几种?如何正确设置和获取?
A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。
1. 变量类型对比
| 类型 | 作用域 (Scope) | 特点 | 适用场景 |
|---|---|---|---|
| Global | Process Instance | 全流程可见,同名覆盖 | 业务主键 (runId)、发起人信息 |
| Local | Execution / Task | 仅当前节点可见,数据隔离 | 异步任务 ID、并行分支数据、临时状态 |
2. 代码操作指南
设置变量 (Write):
// [推荐] 使用 Local 防止并发分支数据污染
// 在 JavaDelegate 中
execution.setVariableLocal("asyncTaskId", id);
// 在 Service 中
runtimeService.setVariableLocal(executionId, "status", "SUCCESS");
获取变量 (Read):
// [推荐] 使用 getVariable (自动冒泡查找)
// 优先找 Local,找不到找 Global。这样兼容性最好。
String val = (String) execution.getVariable("asyncTaskId");
最佳实践: 在
_register节点存入asyncTaskId时使用setVariableLocal。前端查询展示时,建议直接查询业务表 (async_task_record) 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。