Files
spdm-backend/flowable/Flowable 模块梳理说明1.0.md

16 KiB
Raw Blame History

这份 README 文档在原有的基础上进行了深度迭代,完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑,并详细阐述了最新的 ID 管理与状态聚合策略。


SDM Flowable Workflow Module

1. 模块简介

本模块 (com.sdm.flowable) 是系统的工业仿真流程编排引擎核心。基于 Flowable 7.0.1 构建,摒弃了传统的静态 BPMN 文件模式,采用 动态模型生成技术

它不仅支持长耗时 HPC 异步任务的“提交-等待-校验”闭环,还创新性地支持了本地应用 UserTask 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。


2. 核心架构与模型转换 (Dto2BpmnConverter)

核心类 Dto2BpmnConverter 负责将前端定义的业务流程DTO转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略:

2.1 策略 AHPC 任务后置分裂 (Sentinel Pattern)

针对 ServiceTask,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 3 个连续节点

  1. Original Node (ServiceTask): 执行提交逻辑(如提交 HPC
  2. Wait Node (ReceiveTask): 命名为 _wait,挂起流程等待回调。
  3. Check Node (ServiceTask): 命名为 _check,哨兵校验。

链路: Original \rightarrow _wait \rightarrow _check

2.2 策略 B本地应用 UserTask 前置编排 (Pre-Orchestration)

针对 UserTask,若配置为 localApp 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点前面插入 3 个辅助节点:

  1. Register Node (ServiceTask): 命名为 _register。绑定 LocalAppRegisterDelegate,负责生成全局唯一的 asyncTaskId 并注册到业务表,同时存入流程 Local 变量。
  2. Wait Node (ReceiveTask): 命名为 _wait。挂起流程,等待本地应用运行结束的回调。
  3. Check Node (ServiceTask): 命名为 _check。哨兵校验本地应用的回调结果(成功/失败)。
  4. Original Node (UserTask): 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。

链路: _register \rightarrow _wait \rightarrow _check \rightarrow Original

2.3 人工介入控制 (Manual/Auto Switch)

针对普通 HPC 节点,handleWaitUserTask 方法默认在 ServiceTask 前插入 _waitUser (UserTask)。

  • Manual 模式: 流程停在 _waitUser,等待用户确认。
  • Auto 模式: 利用 SkipExpression 自动跳过 _waitUser

3. 异步任务、变量管理与状态聚合

3.1 变量管理与 ID 获取

在本地应用场景中,asyncTaskId 的流转至关重要:

  1. 生成与存储: _register 节点生成 ID通过 asyncTaskRecordService.registerAsyncTask 落库,同时 execution.setVariableLocal 存入流程。
  2. 前端获取: 前端轮询状态时,后端 直接查询业务表 (async_task_record) 获取最新的 asyncTaskId
    • 查询条件: processInstanceId + receiveTaskId + status=RUNNING + OrderByTimeDesc
    • 优势: 确保在“回退重试”场景下,永远获取到最新的业务 ID避免流程变量残留旧值的问题。

3.2 任务完成与强一致性 (completeAsyncTask)

复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 三层校验

  1. 实例校验: 流程是否存在。
  2. 挂起校验: 若流程 SUSPENDED,仅更新数据库为 SUCCESS不触发流转(回调缓冲)。激活时通过 triggerPendingTasks 自动补偿。
  3. 节点校验: 确保流程停在 _wait
  4. 触发: 使用 runtimeService.trigger 唤醒流程。

3.3 状态聚合逻辑 (determineUnifiedState)

针对分裂后的多物理节点,后端计算出唯一的业务聚合状态返给前端:

场景 物理节点位置 聚合状态 前端展示/交互
HPC 运行中 Original / _wait / _check ACTIVE 显示“正在计算”
本地应用运行中 _register / _wait / _check ACTIVE 显示“拉起应用/运行中”
任务失败 _check (死信) ERROR 显示红色错误,允许重试
本地应用成功 Original (UserTask) WAITING_FOR_USER 显示绿色,按钮变为“下一步”
流程挂起 任意节点 SUSPENDED 锁死操作

4. 核心时序交互图

4.1 场景一HPC 异步任务(含挂起与补偿)

展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。

sequenceDiagram
    autonumber
    actor User as 用户/前端
    participant Ctrl as ProcessController
    participant Service as ProcessService
    participant Engine as Flowable Engine
    participant Delegate as UniversalDelegate
    participant DB as DB (AsyncRecord)
    participant Sentinel as AsyncResultCheckDelegate
    participant External as 外部系统(HPC)

    %% ==========================================
    %% 阶段一:异步提交
    %% ==========================================
    rect rgb(240, 255, 240)
        note right of User: == 阶段一HPC 异步提交 ==
        note right of Engine: 流转至 Original Node (ServiceTask)
        Engine->>Delegate: execute()
        Delegate->>External: 提交HPC任务
        Delegate->>DB: 插入记录 (Status=RUNNING)
        note right of Engine: 流转至 _wait Node (ReceiveTask)
        Engine->>Engine: 流程挂起 (Wait State)
    end

    %% ==========================================
    %% 阶段二:挂起与回调缓冲
    %% ==========================================
    rect rgb(255, 240, 240)
        note right of User: == 阶段二:挂起与回调缓冲 ==
        User->>Ctrl: suspendProcessInstance()
        Ctrl->>Engine: runtimeService.suspend()
        note right of Engine: 流程变为 SUSPENDED
        
        External->>Ctrl: asyncCallback (任务完成)
        Ctrl->>Service: asyncCallback()
        Service->>DB: 查询状态
        
        alt 流程已挂起
            DB->>DB: 仅更新 Status=SUCCESS<br/>(不触发 trigger)
            note right of DB: 回调被缓冲,流程停在 _wait
        end
    end

    %% ==========================================
    %% 阶段三:激活与补偿
    %% ==========================================
    rect rgb(240, 240, 255)
        note right of User: == 阶段三:激活与补偿 ==
        User->>Ctrl: activateProcessInstance()
        Ctrl->>Engine: runtimeService.activate()
        
        Ctrl->>Service: triggerPendingTasks() <br/>(激活后的自动动作)
        Service->>DB: 查询缓冲的成功记录
        Service->>Engine: runtimeService.trigger() <br/>(补偿触发)
    end

    %% ==========================================
    %% 阶段四:哨兵校验
    %% ==========================================
    rect rgb(255, 255, 240)
        note right of User: == 阶段四:哨兵校验 ==
        note right of Engine: 流转至 _check Node
        Engine->>Sentinel: execute()
        
        alt STATUS == FAIL
            Sentinel-->>Engine: 抛出异常 -> DeadLetterJob
            note right of Engine: 节点变红 (ERROR)
        end
    end

4.2 场景二:本地应用 UserTask交互闭环

展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。

sequenceDiagram
    autonumber
    actor User as 用户
    participant FE as 前端/插件
    participant Ctrl as ProcessController
    participant Engine as Flowable Engine
    participant Reg as LocalAppRegisterDelegate
    participant DB as DB (AsyncRecord)
    participant Sentinel as AsyncResultCheckDelegate

    %% ==========================================
    %% 步骤 1: 自动注册与挂起
    %% ==========================================
    rect rgb(230, 245, 255)
        note right of User: == 1. 自动注册与挂起 ==
        Engine->>Reg: 进入 _register 节点
        Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING)
        Reg->>Engine: setVariableLocal(asyncTaskId)
        Engine->>Engine: 流转至 _wait, 流程挂起
    end

    %% ==========================================
    %% 步骤 2: 前端获取 ID 并拉起
    %% ==========================================
    rect rgb(255, 250, 230)
        note right of User: == 2. 拉起应用 ==
        FE->>Ctrl: getProcessDetail()
        Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录
        Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE
        
        User->>FE: 点击"拉起应用"
        FE->>User: 唤起本地 EXE (传入 asyncTaskId)
    end

    %% ==========================================
    %% 步骤 3: 异步回调与校验
    %% ==========================================
    rect rgb(240, 255, 240)
        note right of User: == 3. 回调与校验 ==
        User->>FE: 应用运行结束
        FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0)
        Ctrl->>DB: 更新 SUCCESS
        Ctrl->>Engine: runtimeService.trigger()
        
        Engine->>Sentinel: 进入 _check 校验
        Sentinel->>Engine: 校验通过
    end

    %% ==========================================
    %% 步骤 4: 人工确认
    %% ==========================================
    rect rgb(240, 240, 240)
        note right of User: == 4. 人工确认 ==
        Engine->>Engine: 流转至 Original UserTask
        
        FE->>Ctrl: getProcessDetail()
        Ctrl-->>FE: 状态 WAITING_FOR_USER
        
        User->>FE: 点击"下一步"
        FE->>Ctrl: /continueServiceTask
        Ctrl->>Engine: taskService.complete()
    end

5. 附加功能交互流程

5.1 原地重试 (Retry)

当哨兵校验失败(_check 变红)时,用户修复问题后可点击重试。主要用于 HPC 任务数据校验 场景。

  • 操作: /retryFailedNode
  • 原理: 将 DeadLetterJob 移回 ExecutableJob,重新触发校验逻辑。

5.2 回退跳转 (Rewind/Jump)

适用于 本地应用失败参数填写错误 场景。

  • 操作: /retryToNode(targetNodeId)
  • 原理:
    1. 用户点击“重新执行”。
    2. 后端将流程指针强行跳转回 _register(针对本地应用)或任意前置节点。
    3. _register 重新执行,生成全新的 asyncTaskId
    4. 流程重新挂起,前端获取新 ID允许用户再次拉起应用。

6. ProcessController 接口能力清单

方法 描述 逻辑细节
deploy 流程部署 DTO -> BPMN 转换并部署
saveParamsByDefinitionId 保存参数 保存用户输入的节点参数,作为运行模板
startByProcessDefinitionId 启动流程 根据定义ID启动实例
suspendProcessInstance 挂起实例 校验实例状态,挂起后阻止任务执行和回调触发
activateProcessInstance 激活实例 激活流程,并立即调用 triggerPendingTasks 补偿触发积压的回调
cancelProcessInstance 取消实例 终止运行中的流程
getProcessAndNodeDetailByInstanceId 状态查询 获取聚合状态;若为本地应用且 Active会查询业务表返回 asyncTaskId
previewNodeInputFiles 文件预览 扫描 MinIO 或 本地磁盘,返回文件列表
continueServiceTask 继续执行 完成 _waitUserOriginal UserTask,推动流程
asyncCallback 异步回调 通用回调接口,支持 HPC 和 本地应用
retryFailedNode 原地重试 恢复死信作业
retryToNode 回退重试 携带新参数将流程跳转至任意指定节点支持生成新业务ID

好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 第 7 章节 补充在最后。


7. 核心机制与开发 Q&A

Q1: 链路 _waitUser -> Original -> _wait -> _check 中,任意节点都可以挂起吗?

A: 是的,可以在任意节点挂起,但底层表现不同。

所有节点在 Flowable 数据库中都有对应的持久化状态Task、Job 或 Execution挂起操作Suspension是针对流程实例级的会冻结所有当前活跃的节点。

节点 类型 驱动方 挂起后的表现
_waitUser UserTask 人工 用户点击“完成”时报错(拒绝操作)。
Original ServiceTask (Async) 引擎 Job 停留在数据库AsyncExecutor 会跳过执行,直到流程激活。
_wait ReceiveTask 外部 外部回调会被业务代码拦截并缓冲到 DB不触发流程流转
_check ServiceTask (Async) 引擎 Original,校验逻辑暂停执行。

Q2: 为什么只有 _wait (ReceiveTask) 需要代码补偿 (triggerPendingTasks)

A: 因为只有它是“事件驱动”且请求不可重放的。

  • _wait (ReceiveTask): 依赖外部系统的一次性回调。如果挂起期间回调到达Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎不知道发生过回调,必须由代码主动查询数据库并补发 trigger
  • Original / _check: 依赖引擎的轮询扫描。流程激活后AsyncExecutor 下次扫描时会自动发现并执行这些 Job无需人工干预。
  • _waitUser: 依赖人工重试。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。

Q3: runtimeService.trigger 是专门用来触发 ReceiveTask 的吗?

A: 在本系统的上下文中,是的。

请严格区分以下 API 的用途:

  1. runtimeService.trigger(executionId): 专属用于 ReceiveTask (_wait 节点)。告诉引擎结束等待,继续流转。
  2. taskService.complete(taskId): 专属用于 UserTask (_waitUser 节点)。完成人工任务。
  3. managementService.moveDeadLetterJob...: 专属用于 死信 Job (_check 失败后)。恢复异常任务。

注意: 绝不能混用 triggercomplete


Q4: Flowable 变量有哪几种?如何正确设置和获取?

A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。

1. 变量类型对比

类型 作用域 (Scope) 特点 适用场景
Global Process Instance 全流程可见,同名覆盖 业务主键 (runId)、发起人信息
Local Execution / Task 仅当前节点可见,数据隔离 异步任务 ID、并行分支数据、临时状态

2. 代码操作指南

设置变量 (Write):

// [推荐] 使用 Local 防止并发分支数据污染
// 在 JavaDelegate 中
execution.setVariableLocal("asyncTaskId", id);
// 在 Service 中
runtimeService.setVariableLocal(executionId, "status", "SUCCESS");

获取变量 (Read):

// [推荐] 使用 getVariable (自动冒泡查找)
// 优先找 Local找不到找 Global。这样兼容性最好。
String val = (String) execution.getVariable("asyncTaskId");

最佳实践: 在 _register 节点存入 asyncTaskId 时使用 setVariableLocal。前端查询展示时,建议直接查询业务表 (async_task_record) 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。