8.1 KiB
Flowable 模块梳理说明
基于当前代码实现与业务规则整理,覆盖:特殊节点生成、运行流程、异步与重试、挂起激活、文件预览、失败重试与报告导出。
1. 特殊节点生成规则
1.1 serviceTask + 长耗时异步回调场景(HPC 等)
触发条件:
FlowElementDTO.type == serviceTask- 前端传参:
BaseExecuteConfig.asyncCallback = true
设计目标:
serviceTask虽可自动执行,但要支持“等待人工确认后再执行”(手动/自动可切换)。- 因此默认在
serviceTask前置一个UserTask。
生成节点与职责:
-
前置
UserTask(_waitUser)- 绑定
nodeExecutionStrategy.shouldSkip - 作用:控制当前自动节点是直接跳过(AUTO)还是停留等待人工操作(MANUAL)
- 绑定
-
原始
ServiceTask(serviceTask)- 绑定执行器:
universalDelegate - 设置并保存
BaseExecuteConfig - 设置
callbackNodeId = waitTaskId
- 绑定执行器:
-
ReceiveTask(_wait)- 等待异步回调
-
ServiceTask(_check)- 绑定
asyncResultCheckDelegate - 校验回调执行结果并决定流程是否继续
- 绑定
内部连线:
_waitUser(UserTask) -> serviceTask(ServiceTask) -> _wait(ReceiveTask) -> _check(ServiceTask)
2. 流程途中的实际节点生成
2.1 userTask 且属于 localApp(需拉起本地应用)
生成节点与职责:
-
ServiceTask registerTask(_register)- 绑定 start 监听:
userTaskDirectoryPreparationListener - 绑定执行器:
localAppRegisterDelegate - 作用:
- 初始化本地输出目录(
outputDirId) - 生成并记录
asyncTaskId - 记录
processInstanceId/executionId/callbackNodeId/status
- 初始化本地输出目录(
- 绑定 start 监听:
-
ReceiveTask waitTask(_wait)- 等待本地应用异步回传状态
-
ServiceTask checkTask(_check)- 绑定
asyncResultCheckDelegate - 校验回传结果
- 绑定
-
原始
UserTask userTask- 保存
BaseExecuteConfig - 不再绑定
userTaskDirectoryPreparationListener
- 保存
为什么目录初始化前移到 registerTask?
- 本地应用是先被拉起,再回传状态。
- 不能等执行到末端
UserTask才初始化目录,否则回传/落盘链路会出现时序错误。 - 因此必须在前置
registerTask阶段完成目录准备。
内部连线:
_register(ServiceTask) -> _wait(ReceiveTask) -> _check(ServiceTask) -> userTask(UserTask)
2.2 非本地应用 userTask
- 直接生成
userTask(保存BaseExecuteConfig) - 绑定
userTaskDirectoryPreparationListener
3. 通用重试节点
- 额外添加统一重试节点:
UserTask - 绑定监听器:
retryRedirectListener - 用于失败后的统一重试跳转/重定向
4. 执行流程(运行时)
4.1 流程与参数初始化
-
调用
saveParamsByDefinitionId保存用户输入参数到process_node_param,包含:inputDirIdoutputDirIdrunIdprocessDefinitionIdnodeId
-
设置流程全局变量:
runIduserIduserNametenantId
-
调用
/startByProcessDefinitionId启动流程实例,获取processInstanceId -
同步调用
/updateNodeParamProcessInstanceId回写process_node_param.processInstanceId
4.2 拉起本地应用(localApp)执行链路
-
执行
_register前触发userTaskDirectoryPreparationListener:- 初始化输出目录
outputDirId - 若存在则删除后重建空目录
- 不准备输入目录
inputDirId - 输入目录通常在用户首次上传时创建
- 初始化输出目录
-
执行
_register的localAppRegisterDelegate:- 生成唯一
asyncTaskId - 获取并记录:
processInstanceId、executionId、callbackNodeId、status - 写入
async_task_record
- 生成唯一
-
本地应用通过
/asyncCallback回传:- 本地应用信息回写
simulation_local_job - 基于
asyncTaskId查async_task_record - 基于
executionId+waitTaskId查Execution - 更新本次状态到
async_task_record - 设置变量:
ReceivetaskCallbackeStatus、ReceivetaskCallbackeMsg - 调用
runtimeService.trigger(executionId)完成_wait
- 本地应用信息回写
-
执行
_check(asyncResultCheckDelegate):- 若
ReceivetaskCallbackeStatus失败:流程异常 - 若成功:清理
ReceivetaskCallbackeStatus、ReceivetaskCallbackeMsg - 继续流转
- 若
-
调用
/continueServiceTask:- 按
taskDefinitionKey查任务 - 调用
taskService.complete完成用户任务
- 按
4.3 长耗时异步 serviceTask(可等待人工确认)
-
前置
_waitUser+nodeExecutionStrategy:- 从
process_node_param的EXECUTE_MODE_KEY读取执行模式:AUTO | MANUAL MANUAL:停留当前UserTask,等待/continueServiceTaskAUTO:自动跳过UserTask,继续执行serviceTask
- 从
-
执行
serviceTask(universalDelegate):- 获取
outputDirId - 若输出目录已存在,则删除后重建空目录
- 不准备输入目录
inputDirId
- 获取
-
HPC 异步提交:
- 生成
asyncTaskId - 记录
processInstanceId、executionId、callbackNodeId、status - 更新
async_task_record
- 生成
-
/asyncCallback回调:- 按
asyncTaskId查async_task_record - 基于
executionId+waitTaskId查Execution - 更新本次状态
- 设置
ReceivetaskCallbackeStatus、ReceivetaskCallbackeMsg runtimeService.trigger(executionId)完成_wait
- 按
-
执行
_check(asyncResultCheckDelegate):- 失败则流程异常
- 成功则清理变量并继续执行
5. setAsynchronous(true) 的意义
- 若不设异步:节点同步执行,异常会直接在当前线程抛出,难以形成统一可观测失败态。
- 若设异步:Flowable 将执行转为 Job,异常持久化到 Job/DeadLetter 相关表,便于:
- 流程状态展示
- 节点错误定位
- 失败重试与补偿
当前模块的状态展示已依赖 Job 维度来判断节点错误与流程异常态。
6. 挂起与激活机制
6.1 挂起(suspend)
调用:runtimeService.suspendProcessInstanceById(processInstanceId)
引擎关键动作(原子性更新):
- 更新流程实例状态:
ACT_RU_EXECUTION根执行流SUSPENSION_STATE_置为1(suspended)
- 级联更新子执行流:
- 同实例下子 execution 同步挂起
- 冻结用户任务:
ACT_RU_TASK.SUSPENSION_STATE_置挂起- 禁止
complete()/claim()
- 冻结异步 Job:
ACT_RU_JOB / ACT_RU_TIMER_JOB等任务挂起JobExecutor跳过执行
- 操作校验拦截:
- 后续
complete/trigger/job执行前均检查SUSPENSION_STATE_ - 挂起状态直接抛
ProcessEngineException
- 后续
6.2 激活(activate)
调用:runtimeService.activateProcessInstanceById(processInstanceId)
反向恢复动作:
execution的SUSPENSION_STATE_改回0(active)- 解除
UserTask挂起标记 - 解除 Job 挂起标记,
JobExecutor恢复消费 - 挂起期间未执行 Job 恢复后按调度继续执行
7. /previewNodeInputFiles 功能
- 获取当前节点输入目录下的所有文件
- 根据正则配置过滤前一个节点输出目录中的文件
- 汇总返回给前端进行节点输入预览
8. /retryToNode 失败重试统一入口
- 使用:
moveActivityIdsToSingleActivityId - 语义:将当前分散活动节点(含并行分支)统一收束并跳转到指定节点
- 价值:
- 统一失败回退入口
- 解决并行分支下的“幽灵分支/部分回退”问题
9. 报告导出(ExportWordScriptHandler)
- 报告由 Python 在本地直接生成
- 当前不上传 MinIO
10. 总结
该模块的核心是:
- 通过“前置控制节点 + 异步等待节点 + 校验节点”把长耗时任务流程化、可观测化;
- 通过
AUTO/MANUAL策略兼容自动执行与人工确认; - 通过异步 Job 与死信机制保障错误可见、可重试、可恢复;
- 通过挂起/激活与统一重试跳转提升流程运行时可控性。