fix:支持直接确认节点为已完成状态

This commit is contained in:
2026-03-26 16:06:22 +08:00
parent 196c1ce019
commit 42c8c81def
4 changed files with 1436 additions and 250 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -26,9 +26,9 @@ public enum OperationTypeEnum {
TERMINATE(ProcessInstanceStateEnum.RUNNING, ProcessInstanceStateEnum.SUSPENDED, ProcessInstanceStateEnum.ERROR),
/**
* 重试/跳转:必须是 FAILED 状态(有死信)
* 重试/跳转:必须是 FAILED 状态(有死信),RUNNING
*/
RETRY(ProcessInstanceStateEnum.ERROR),
RETRY(ProcessInstanceStateEnum.ERROR,ProcessInstanceStateEnum.RUNNING),
/**
* 启动:通常对应无实例状态,这里作为占位

View File

@@ -94,7 +94,7 @@ public class ProcessService implements Iprocess{
// 部署流程前端传入Flowable标准JSON
public SdmResponse<DeployFlowableResp> deploy(ProcessDefinitionDTO processDTO) throws Exception {
public SdmResponse<DeployFlowableResp> deploy(ProcessDefinitionDTO processDTO) throws Exception {
log.info("开始部署流程定义: {}",processDTO);
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
log.info("BPMN模型转换完成");
@@ -558,11 +558,20 @@ public class ProcessService implements Iprocess{
}
// 1. 判断 ERROR (Check 节点死信)
if (ctx.errorMap.containsKey(checkId)) {
boolean hasError = ctx.errorMap.containsKey(registerId)
|| ctx.errorMap.containsKey(waitId)
|| ctx.errorMap.containsKey(checkId)
||ctx.errorMap.containsKey(origId);
if (hasError) {
// 从 registerId、waitId、checkId、origId 中获取第一个非空的错误信息
String errorMessage = ctx.errorMap.get(checkId);
if (errorMessage == null) errorMessage =ctx.errorMap.get(registerId);
if (errorMessage == null) errorMessage = ctx.errorMap.get(waitId);
if (errorMessage == null) errorMessage = ctx.errorMap.get(origId);
node.setStatus(NodeStateEnum.ERROR.getCode());
node.setErrorMessage(ctx.errorMap.get(checkId));
node.setErrorMessage(errorMessage);
// 计算时间:从 register 开始,到当前时刻
calculateLocalAppTime(node, ctx, registerId, origId);
calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId);
return;
}
@@ -570,7 +579,7 @@ public class ProcessService implements Iprocess{
// 这代表前置的自动化流程跑完了,且成功了,正在等用户点下一步
if (ctx.activeActivityIds.contains(origId)) {
node.setStatus(NodeStateEnum.WAITING_FOR_USER.getCode());
calculateLocalAppTime(node, ctx, registerId, origId);
calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId);
return;
}
@@ -583,15 +592,18 @@ public class ProcessService implements Iprocess{
if (isActive) {
// 如果流程挂起,优先显示挂起
node.setStatus(ctx.isSuspended() ? NodeStateEnum.SUSPENDED.getCode() : NodeStateEnum.ACTIVE.getCode());
calculateLocalAppTime(node, ctx, registerId, origId);
calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId);
return;
}
// 4. 判断 FINISHED (Original 节点已结束)
HistoricActivityInstance lastHist = ctx.historyMap.get(origId);
if (lastHist != null && lastHist.getEndTime() != null) {
// 正常情况下:以 origId 的历史 endTime 为准。
// 兜底情况下:如果由于“强制跳转”绕过了 origId 的创建/执行historyMap 可能不存在 origId。
// 此时只要链路_register/_wait/_check中任意节点产生过结束记录并且当前不再处于链路活动中
// 就将 origId 视为已完成,避免界面长期停留在 ERROR/PENDING。
if (isLocalAppOrigFinished(ctx, origId, registerId, waitId, checkId)) {
node.setStatus(NodeStateEnum.FINISHED.getCode());
calculateLocalAppTime(node, ctx, registerId, origId);
calculateLocalAppTime(node, ctx, registerId,waitId, checkId, origId);
return;
}
@@ -599,6 +611,54 @@ public class ProcessService implements Iprocess{
node.setStatus(NodeStateEnum.PENDING.getCode());
}
/**
* 判断 Local App 聚合节点Original/UserTask是否应当被标记为 FINISHED。
* <p>
* 正常判定:依赖 origId 的历史 endTime。
* <p>
* 跳转兜底:当强制跳转绕过 origId 的实际创建/执行时,可能缺失 origId 的历史记录;
* 这时若链路上_register/_wait/_check任意节点已出现结束时间记录且当前没有任何链路节点仍在活动
* 则认为 origId 的聚合状态应为 FINISHED。
*
* @param ctx 状态上下文
* @param origId Original 逻辑节点ID
* @param registerId _register 节点ID
* @param waitId _wait 节点ID
* @param checkId _check 节点ID
* @return true 表示应标记为 FINISHED
*/
private boolean isLocalAppOrigFinished(ProcessStateContext ctx, String origId, String registerId, String waitId, String checkId) {
HistoricActivityInstance lastHist = ctx.historyMap.get(origId);
if (lastHist != null && lastHist.getEndTime() != null) {
return true;
}
// 跳转兜底:链路没有活动 token且历史上至少有一个链路节点出现过结束时间
boolean chainHasActiveToken = ctx.activeActivityIds.contains(registerId)
|| ctx.activeActivityIds.contains(waitId)
|| ctx.activeActivityIds.contains(checkId)
|| ctx.activeActivityIds.contains(origId);
if (chainHasActiveToken) {
return false;
}
Date registerEnd = getEndTime(ctx.historyMap.get(registerId));
Date waitEnd = getEndTime(ctx.historyMap.get(waitId));
Date checkEnd = getEndTime(ctx.historyMap.get(checkId));
return registerEnd != null || waitEnd != null || checkEnd != null;
}
/**
* 安全获取 HistoricActivityInstance 的 endTime。
*
* @param hist 历史活动实例
* @return endTime如果 hist 为空或 endTime 为空则返回 null
*/
private Date getEndTime(HistoricActivityInstance hist) {
return hist != null ? hist.getEndTime() : null;
}
private void determineServiceTaskUnifiedState(NodeDetailInfo node, ProcessStateContext ctx) {
String origId = node.getId();
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(origId);
@@ -606,15 +666,22 @@ public class ProcessService implements Iprocess{
String checkId = FlowNodeIdUtils.generateCheckTaskId(origId);
// 1. 判断 ERROR (优先级最高)
// 链条任何一环报错,原始节点都算错
String errorMsg = ctx.errorMap.get(checkId); // 最常见check挂了
if (errorMsg == null) errorMsg = ctx.errorMap.get(origId); // 自己挂了
// 也可以加上 waitId 的判断,虽然 ReceiveTask 很难挂
// 1. 判断 ERROR (Check 节点死信)
boolean hasError = ctx.errorMap.containsKey(waitUserId)
||ctx.errorMap.containsKey(origId)
|| ctx.errorMap.containsKey(waitId)
|| ctx.errorMap.containsKey(checkId);
if (hasError) {
// 从 registerId、waitId、checkId、origId 中获取第一个非空的错误信息
String errorMessage = ctx.errorMap.get(waitUserId);
if (errorMessage == null) errorMessage =ctx.errorMap.get(origId);
if (errorMessage == null) errorMessage = ctx.errorMap.get(waitId);
if (errorMessage == null) errorMessage = ctx.errorMap.get(checkId);
if (errorMsg != null) {
node.setStatus(NodeStateEnum.ERROR.getCode());
node.setErrorMessage(errorMsg);
calculateAggregatedTime(node, ctx, waitUserId, origId, checkId);
node.setErrorMessage(errorMessage);
calculateAggregatedTime(node, ctx, waitUserId, origId,waitId, checkId);
return;
}
@@ -640,21 +707,17 @@ public class ProcessService implements Iprocess{
// 停留在 origId 或 waitId 或 checkId-> 视为通用执行中
node.setStatus(NodeStateEnum.ACTIVE.getCode());
}
calculateAggregatedTime(node, ctx, waitUserId, origId, checkId);
calculateAggregatedTime(node, ctx, waitUserId, origId,waitId, checkId);
return;
}
// 3. 判断 FINISHED
// 必须是链条的"最后一个环节"结束了,才算整个节点结束
// 顺序WaitUser -> Original -> Wait -> Check
// 我们检查 Check 是否有历史;如果没有 Check (非HPC节点),检查 Original
HistoricActivityInstance lastHist = ctx.historyMap.get(checkId);
if (lastHist == null) lastHist = ctx.historyMap.get(waitId); // 兼容
if (lastHist == null) lastHist = ctx.historyMap.get(origId);
if (lastHist != null && lastHist.getEndTime() != null) {
// 兜底:当发生“强制跳转”绕过链路最后环节时,可能缺失 check/orig 的历史记录。
// 只要链路waitUser/orig/wait/check中任意节点产生过结束记录并且当前不再处于链路活动中
// 就将 origId 视为已完成,避免界面长期停留在 PENDING。
if (isServiceTaskOrigFinished(ctx, waitUserId, origId, waitId, checkId)) {
node.setStatus(NodeStateEnum.FINISHED.getCode());
calculateAggregatedTime(node, ctx, waitUserId, origId, checkId);
calculateAggregatedTime(node, ctx, waitUserId, origId, waitId,checkId);
return;
}
@@ -662,6 +725,39 @@ public class ProcessService implements Iprocess{
node.setStatus(NodeStateEnum.PENDING.getCode());
}
/**
* 判断 ServiceTask 聚合节点Original是否应当被标记为 FINISHED。
* <p>
* 正常判定:依赖链路最后环节(通常是 _check的历史 endTime。
* <p>
* 跳转兜底:当强制跳转绕过了链路最后环节或 origId 的实际创建/执行时,可能缺失关键节点历史记录;
* 这时若链路上waitUser/orig/wait/check任意节点已出现结束时间记录且当前没有任何链路节点仍在活动
* 则认为 origId 的聚合状态应为 FINISHED。
*
* @param ctx 状态上下文
* @param waitUserId _waitUser 节点ID
* @param origId Original 逻辑节点ID
* @param waitId _wait 节点ID
* @param checkId _check 节点ID
* @return true 表示应标记为 FINISHED
*/
private boolean isServiceTaskOrigFinished(ProcessStateContext ctx, String waitUserId, String origId, String waitId, String checkId) {
boolean chainHasActiveToken = ctx.activeActivityIds.contains(waitUserId)
|| ctx.activeActivityIds.contains(origId)
|| ctx.activeActivityIds.contains(waitId)
|| ctx.activeActivityIds.contains(checkId);
if (chainHasActiveToken) {
return false;
}
Date waitUserEnd = getEndTime(ctx.historyMap.get(waitUserId));
Date origEnd = getEndTime(ctx.historyMap.get(origId));
Date waitEnd = getEndTime(ctx.historyMap.get(waitId));
Date checkEnd = getEndTime(ctx.historyMap.get(checkId));
return waitUserEnd != null || origEnd != null || waitEnd != null || checkEnd != null;
}
// 判断是否为本地应用节点
private boolean isLocalApp(NodeDetailInfo node) {
@@ -718,7 +814,7 @@ public class ProcessService implements Iprocess{
* 工具:计算聚合时间 (Original Start -> Check End)
*/
private void calculateAggregatedTime(NodeDetailInfo node, ProcessStateContext ctx,
String waitUserId, String origId, String checkId) {
String waitUserId, String origId, String waitId,String checkId) {
// Start Time: 链条最早的开始时间
HistoricActivityInstance startNode = ctx.historyMap.get(waitUserId);
if (startNode == null) startNode = ctx.historyMap.get(origId);
@@ -726,12 +822,20 @@ public class ProcessService implements Iprocess{
Date startTime = (startNode != null) ? startNode.getStartTime() : null;
node.setStartTime(startTime);
// End Time: 只有状态是 FINISHED 才有结束时间,取 Check 的结束时间
// End Time: 只有状态是 FINISHED才有结束时间四个节点waitUser, orig, wait, check的结束时间的最大值
Date endTime = null;
if (NodeStateEnum.FINISHED.getCode().equals(node.getStatus())) {
HistoricActivityInstance endNode = ctx.historyMap.get(checkId);
if (endNode == null) endNode = ctx.historyMap.get(origId);
if (endNode != null) endTime = endNode.getEndTime();
List<Date> endTimes = Arrays.asList(
getEndTime(ctx.historyMap.get(waitUserId)),
getEndTime(ctx.historyMap.get(origId)),
getEndTime(ctx.historyMap.get(waitId)),
getEndTime(ctx.historyMap.get(checkId))
);
endTime = endTimes.stream()
.filter(Objects::nonNull)
.max(Date::compareTo)
.orElse(null);
}
node.setEndTime(endTime);
@@ -751,7 +855,7 @@ public class ProcessService implements Iprocess{
* @param registerId 链条起点的ID
* @param origId 链条终点的ID
*/
private void calculateLocalAppTime(NodeDetailInfo node, ProcessStateContext ctx, String registerId, String origId) {
private void calculateLocalAppTime(NodeDetailInfo node, ProcessStateContext ctx, String registerId,String waitId,String checkId, String origId) {
// 1. 获取开始时间 (取 _register 的历史记录)
HistoricActivityInstance startInst = ctx.historyMap.get(registerId);
@@ -763,14 +867,20 @@ public class ProcessService implements Iprocess{
Date startTime = (startInst != null) ? startInst.getStartTime() : null;
node.setStartTime(startTime);
// 2. 获取结束时间 (仅当整个节点 FINISHED 时才有值)
// 2. 只有状态是 FINISHED才有结束时间取四个节点waitUser, orig, wait, check的结束时间的最大值
Date endTime = null;
if (NodeStateEnum.FINISHED.getCode().equals(node.getStatus())) {
// 取终点 Original (UserTask) 的历史记录
HistoricActivityInstance endInst = ctx.historyMap.get(origId);
if (endInst != null) {
endTime = endInst.getEndTime();
}
List<Date> endTimes = Arrays.asList(
getEndTime(ctx.historyMap.get(registerId)),
getEndTime(ctx.historyMap.get(waitId)),
getEndTime(ctx.historyMap.get(checkId)),
getEndTime(ctx.historyMap.get(origId))
);
endTime = endTimes.stream()
.filter(Objects::nonNull)
.max(Date::compareTo)
.orElse(null);
}
node.setEndTime(endTime);
@@ -1188,6 +1298,9 @@ public class ProcessService implements Iprocess{
// 推测是否存在对应的注册节点 (Local App 特征)
String registerNodeId = FlowNodeIdUtils.generateRegisterTaskId(originalNodeId);
// 推测是否有对应的_waitUser 节点 HPC节点的特征
String waitUserNodeId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
// 如果注册节点存在,说明这是 Local App 链路,且必须从注册节点重新开始
// 避免直接跳到中间状态 (如 check 或 wait),导致本地应用没拉起来
if (bpmnModel.getMainProcess().getFlowElement(registerNodeId) != null) {
@@ -1196,6 +1309,14 @@ public class ProcessService implements Iprocess{
targetNodeId = registerNodeId;
}
}
// 如果存在 _waitUser 节点,说明这是 HPC 节点,且必须从 _waitUser 节点重新开始
if (bpmnModel.getMainProcess().getFlowElement(waitUserNodeId) != null) {
if (!waitUserNodeId.equals(targetNodeId)) {
log.info("检测到 HPC 链路,自动修正重试目标: {} -> {}", targetNodeId, waitUserNodeId);
targetNodeId = waitUserNodeId;
}
}
}
// 1. 获取当前流程实例中 **所有** 活跃/停滞的节点

View File

@@ -181,6 +181,32 @@ public class Dto2BpmnConverter {
private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map<String, String> waitUserTaskMap) {
// 用来设置节点是否人工执行: 只有当前节点是ServiceTask才需要判断是否等待用户输入需要才创建前置UserTask
// 而且必须出入,这是节点开启异步带来的问题:
/**
* 事务 A抵达节点成功提交
* 上一步的 /asyncCallback 调用了 runtimeService.trigger()。
* 引擎让令牌离开 _wait走向 _check。
* 因为 _check 是异步节点,引擎不会立刻执行它,而是做两件事:
* 将 ACT_RU_EXECUTION 表的指针指向 _check。
* 在 ACT_RU_JOB 表中插入一条待执行的任务记录。
* 事务 A 提交Commit
* 注意此时Flowable 的历史机制认为“节点还没真正开始执行 Java 代码”,所以此时 ACT_HI_ACTINST 中还没有 _check 的记录。
*
* Flowable 的后台异步线程AsyncExecutor拿到了这个 Job开启全新的事务 B。
* 线程开始真正进入 _check 节点,第一步:往 ACT_HI_ACTINST 插入一条开始记录。
* 第二步:执行您绑定的 Java 代码 AsyncResultCheckDelegate。
* 第三步:因为回调状态是 FAIL您的 Java 代码中抛出了 RuntimeException("异步任务执行失败...")。
* 致命结果:因为抛出了未捕获的运行时异常,整个事务 B 被数据库强行回滚Rollback
* 👉 刚才第一步写入 ACT_HI_ACTINST 的历史记录,随着事务回滚,消失了
*
* 引擎捕获到了事务 B 的回滚,开启事务 C 进行善后。
* 检查您的重试策略是 R00次重试于是直接将该任务从 ACT_RU_JOB 移入 ACT_RU_DEADLETTER_JOB死信表
* 将 ACT_RU_EXECUTION 中的 DEADLETTER_JOB_COUNT_ 更新为 1就是您截图里看到的那样
* 事务 C 提交。
*
* 解决 强制跳过后 ACT_HI_ACTINST 丢失就是在前面插入一个UserTask强制跳转的时候都会先跳到这个UserTask保证历史表里有这个聚合记录防止SERVICETASK挂了没有信息可以查询执行过
*/
if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType())) {
String originalNodeId = nodeDto.getId();
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);