配置文件

This commit is contained in:
2025-12-11 15:07:58 +08:00
parent 3c218626c1
commit 56bb2bc50d
35 changed files with 1342 additions and 469 deletions

View File

@@ -128,6 +128,37 @@ public class ProcessController implements IFlowableFeignClient {
return SdmResponse.success(processInstanceResp);
}
/**
* 挂起流程实例
* @param processInstanceId 流程实例ID
* @return
*/
@GetMapping("/suspendProcessInstance")
SdmResponse suspendProcessInstance(@RequestParam String processInstanceId){
return processService.suspendProcessInstance(processInstanceId);
}
/**
* 激活流程实例
* @param processInstanceId 流程实例ID
* @return
*/
@GetMapping("/activateProcessInstance")
SdmResponse activateProcessInstance(@RequestParam String processInstanceId){
return processService.activateProcessInstance(processInstanceId);
}
/**
* 取消流程实例
* @param processInstanceId 流程实例ID
* @return
*/
@GetMapping("/cancelProcessInstance")
SdmResponse cancelProcessInstance(@RequestParam String processInstanceId){
return processService.cancelProcessInstance(processInstanceId);
}
/**
* 查询流程实例及所有节点的详细状态(返回结构化 DTO
* 如果只传了processDefinitionId根据流程定义返回流程基本信息和节点信息

View File

@@ -0,0 +1,46 @@
package com.sdm.flowable.process;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.flowable.dto.ProcessDefinitionDTO;
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.resp.flowable.DeployFlowableResp;
import com.sdm.common.entity.resp.flowable.ProcessInstanceDetailResponse;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.validation.ValidationError;
import java.util.*;
public interface Iprocess {
// 验证并返回模型验证错误信息
List<ValidationError> validateModel(ProcessDefinitionDTO processDTO) throws Exception;
// 部署流程前端传入Flowable标准JSON
SdmResponse<DeployFlowableResp> deploy(ProcessDefinitionDTO processDTO) throws Exception;
void deleteAllDeployments();
ProcessInstance startByProcessDefinitionId(String processDefinitionId, Map<String, Object> variables);
SdmResponse suspendProcessInstance(String processInstanceId);
SdmResponse activateProcessInstance(String processInstanceId);
SdmResponse cancelProcessInstance(String processInstanceId);
/**
* 查询流程实例及所有节点的详细状态(返回结构化 DTO
* 如果只传了processDefinitionId根据流程定义返回流程基本信息和节点信息
* 如果还传了processInstanceId再封装流程状态和节点状态
*/
SdmResponse<ProcessInstanceDetailResponse> getProcessAndNodeDetailByInstanceId(String processDefinitionId, String processInstanceId, String runId);
SdmResponse continueServiceTask( CompleteTaskReq req);
SdmResponse asyncCallback(AsyncCallbackRequest request);
SdmResponse retryFailedNode( String processInstanceId, String failNodeId);
SdmResponse retryToNode(String procInstId, String targetNodeId, Map<String, Object> newVariables);
}

View File

@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
@Slf4j
@Service
public class ProcessService {
public class ProcessService implements Iprocess{
@Autowired
private RepositoryService repositoryService;
@@ -136,6 +136,99 @@ public class ProcessService {
return instance;
}
/**
* 挂起流程实例
*/
public SdmResponse suspendProcessInstance(String processInstanceId) {
log.info("请求挂起流程实例: {}", processInstanceId);
try {
// 检查流程是否存在且处于运行状态
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
return SdmResponse.failed("流程实例不存在或已结束,无法挂起");
}
if (processInstance.isSuspended()) {
return SdmResponse.failed("流程实例已经是挂起状态");
}
runtimeService.suspendProcessInstanceById(processInstanceId);
log.info("流程实例挂起成功: {}", processInstanceId);
return SdmResponse.success("流程实例挂起成功");
} catch (Exception e) {
log.error("挂起流程实例失败: {}", e.getMessage(), e);
return SdmResponse.failed("挂起流程实例失败");
}
}
/**
* 激活流程实例
*/
public SdmResponse activateProcessInstance(String processInstanceId) {
log.info("请求激活流程实例: {}", processInstanceId);
try {
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
return SdmResponse.failed("流程实例不存在或已结束,无法激活");
}
if (!processInstance.isSuspended()) {
return SdmResponse.failed("流程实例已经是激活状态");
}
runtimeService.activateProcessInstanceById(processInstanceId);
log.info("流程实例激活成功: {}", processInstanceId);
return SdmResponse.success();
} catch (Exception e) {
log.error("激活流程实例失败: {}", e.getMessage(), e);
return SdmResponse.failed("激活流程实例失败 ");
}
}
/**
* 取消/终止流程实例
* @param processInstanceId 流程实例ID
*/
public SdmResponse cancelProcessInstance(String processInstanceId) {
log.info("请求取消/终止流程实例: {}", processInstanceId);
try {
// 1. 检查流程是否正在运行
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
// 如果运行时查不到,可能已经结束了,或者不存在
// 进一步去历史查一下,明确告知用户
HistoricProcessInstance history = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (history != null) {
return SdmResponse.failed("该流程实例已经结束,无法取消");
} else {
return SdmResponse.failed("流程实例不存在");
}
}
// 2. 执行删除在Flowable中终止运行中的流程就是删除运行时数据
// 参数1: ID, 参数2: 删除原因 (会被写入历史表的 DELETE_REASON_ 字段)
runtimeService.deleteProcessInstance(processInstanceId, "");
log.info("流程实例已取消: {}", processInstanceId);
return SdmResponse.success("流程实例已取消");
} catch (Exception e) {
log.error("取消流程实例失败: {}", e.getMessage(), e);
return SdmResponse.failed("取消流程实例失败" );
}
}
// 验证并返回模型验证错误信息
public List<ValidationError> validateModel(ProcessDefinitionDTO processDTO) throws Exception {
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
@@ -150,66 +243,73 @@ public class ProcessService {
*/
public SdmResponse<ProcessInstanceDetailResponse> getProcessAndNodeDetailByInstanceId(String processDefinitionId, String processInstanceId, String runId) {
ProcessInstanceDetailResponse response = new ProcessInstanceDetailResponse();
// 构建基础流程信息
// 基础流程信息
ProcessInstanceInfo processInfo = new ProcessInstanceInfo();
processInfo.setProcessDefinitionId(processDefinitionId);
// 获取流程定义中的节点结构信息
// 获取节点结构
List<FlowNode> orderedNodes = getOrderedFlowNodes(processDefinitionId);
List<NodeDetailInfo> nodes = orderedNodes.stream()
.map(this::buildNodeDetailInfoFromFlowNode) // 直接构建NodeDetailInfo
.map(this::buildNodeDetailInfoFromFlowNode)
.peek(detail -> {
JSONObject params = processNodeParamService.getParam(processDefinitionId, detail.getId(), runId);
detail.setUserParam(params);
})
.collect(Collectors.toList());
if (processInstanceId != null && !processInstanceId.isEmpty()) {
// 如果提供了流程实例ID则补充完整状态信息
processInfo = buildProcessInstanceInfo(processInstanceId);
// 补充节点状态信息
// --- 状态判断逻辑开始 ---
// 1. 获取运行时流程实例对象(判断是否运行中、是否挂起)
ProcessInstance runtimeInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
boolean isRunning = (runtimeInstance != null);
boolean isProcessSuspended = isRunning && runtimeInstance.isSuspended();
// 2. 准备历史数据
List<HistoricActivityInstance> historicActivities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.list();
// 为了处理多实例或循环,最好按结束时间倒序排,取最新的
Map<String, HistoricActivityInstance> activityMap = new HashMap<>();
for (HistoricActivityInstance hist : historicActivities) {
// 如果Map里已经有了且当前的开始时间比Map里的晚则覆盖取最新的
if (!activityMap.containsKey(hist.getActivityId()) ||
hist.getStartTime().after(activityMap.get(hist.getActivityId()).getStartTime())) {
activityMap.put(hist.getActivityId(), hist);
}
}
List<String> activeActivityIds = isProcessRunning(processInstanceId)
// 3. 准备运行时 Active ID 列表
List<String> activeActivityIds = isRunning
? runtimeService.getActiveActivityIds(processInstanceId)
: Collections.emptyList();
// 获取异常/失败的任务信息
// Map<ActivityId, ErrorMessage>
// 4. 准备异常信息 (DeadLetterJob 代表严重错误/失败)
Map<String, String> errorMap = new HashMap<>();
if (isProcessRunning(processInstanceId)) {
// 1. 查询死信作业 (DeadLetterJob) - 重试次数用尽,彻底失败
boolean hasDeadLetterJobs = false; // 标记流程是否有死信
if (isRunning) {
// 死信作业 (DeadLetterJob) - 彻底失败
List<Job> deadJobs = managementService.createDeadLetterJobQuery()
.processInstanceId(processInstanceId).list();
if (!deadJobs.isEmpty()) {
hasDeadLetterJobs = true;
}
// 2. 查询普通作业 (Job) - 包含正在重试中的异常 (exceptionMessage不为空)
// 普通作业 (Job) - 包含重试中的异常
List<Job> activeJobs = managementService.createJobQuery()
.processInstanceId(processInstanceId).list();
// 3. 辅助查询:获取 Execution 以便将 Job 映射到 ActivityId
// Job 对象里通常只有 executionId我们需要知道这个 execution 当前在哪个 activityId
// 映射 ExecutionId -> ActivityId
List<Execution> executions = runtimeService.createExecutionQuery()
.processInstanceId(processInstanceId).list();
Map<String, String> executionToActivityMap = executions.stream()
// 【关键】过滤掉 activityId 为 null 的记录根执行实例Root Execution代表流程实例本身的那条记录它的 activityId 通常是 null
.filter(e -> e.getActivityId() != null)
.collect(Collectors.toMap(Execution::getId, Execution::getActivityId, (v1, v2) -> v1));
// 填充错误 Map
Consumer<Job> mapJobToError = job -> {
if (job.getExceptionMessage() != null) {
String activityId = executionToActivityMap.get(job.getExecutionId());
@@ -218,32 +318,38 @@ public class ProcessService {
}
}
};
deadJobs.forEach(mapJobToError);
activeJobs.forEach(mapJobToError);
}
// --- 5. 构建流程实例级状态 (ProcessInfo) ---
// 这里传入新的状态标志hasDeadLetterJobs, isProcessSuspended
processInfo = buildProcessInstanceInfo(processInstanceId, isRunning, isProcessSuspended, hasDeadLetterJobs);
// 更新节点状态信息,遍历节点设置状态,并处理 ServiceTask + ReceiveTask 的合并逻辑
// --- 6. 构建节点状态 ---
for (NodeDetailInfo node : nodes) {
String originalNodeId = node.getId();
// 4.1 获取当前节点的物理状态
HistoricActivityInstance myHist = activityMap.get(originalNodeId);
boolean isActive = activeActivityIds.contains(originalNodeId);
boolean isFinished = myHist != null && myHist.getEndTime() != null;
// 判断该节点是否有报错
String errorMessage = errorMap.get(originalNodeId);
boolean isError = errorMessage != null;
// 初始状态判定 (优先级Error > Finished > Active > Pending)
// 优先级判断状态
String displayStatus;
if (isError) {
displayStatus = "error";
displayStatus = "error"; // 节点报错优先
} else if (isActive) {
// 【优化点】:如果节点是 Active但流程是 Suspended则节点状态为 suspended
if (isProcessSuspended) {
displayStatus = "suspended";
} else {
displayStatus = "active";
}
} else if (isFinished) {
displayStatus = "finished";
} else if (isActive) {
displayStatus = "active";
} else {
displayStatus = "pending";
}
@@ -252,22 +358,19 @@ public class ProcessService {
Date endTime = myHist != null ? myHist.getEndTime() : null;
Long duration = myHist != null ? myHist.getDurationInMillis() : null;
// 4.2 特殊处理:检查是否为异步回调的 ServiceTask
// ServiceTask 异步回调逻辑特殊处理 (保持原有逻辑,叠加挂起判断)
if (isAsyncCallbackNode(node)) {
// 如果原始节点本身就报错了ServiceTask 执行 JavaDelegate 时抛出异常)
// 此时还没走到 ReceiveTask所以直接显示 Error
if (isError) {
node.setErrorMessage(errorMessage); // 假设 NodeDetailInfo 有这个字段
// 保持 displayStatus = "error"
node.setErrorMessage(errorMessage);
} else {
// 原有逻辑:推算 ReceiveTask
String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId);
HistoricActivityInstance waitHist = activityMap.get(waitNodeId);
boolean waitIsActive = activeActivityIds.contains(waitNodeId);
boolean waitIsFinished = waitHist != null && waitHist.getEndTime() != null;
if (waitIsActive) {
displayStatus = "active";
// 同样:如果 ReceiveTask 等待中,但流程挂起,显示挂起
displayStatus = isProcessSuspended ? "suspended" : "active";
endTime = null;
if (startTime != null) {
duration = System.currentTimeMillis() - startTime.getTime();
@@ -279,19 +382,16 @@ public class ProcessService {
duration = endTime.getTime() - startTime.getTime();
}
} else if (isFinished && !waitIsFinished && !waitIsActive) {
// 极短时间差
displayStatus = "active";
displayStatus = isProcessSuspended ? "suspended" : "active";
endTime = null;
}
}
} else {
// 非特殊节点,如果是 Error 状态,填充错误信息
if (isError) {
node.setErrorMessage(errorMessage);
}
}
// 4.3 赋值给 DTO
node.setStatus(displayStatus);
node.setStartTime(startTime);
node.setEndTime(endTime);
@@ -301,14 +401,14 @@ public class ProcessService {
}
}
}
response.setProcessInfo(processInfo);
response.setNodes(nodes);
return SdmResponse.success(response);
}
// 构建流程实例信息
private ProcessInstanceInfo buildProcessInstanceInfo(String processInstanceId) {
private ProcessInstanceInfo buildProcessInstanceInfo(String processInstanceId, boolean isRunning, boolean isSuspended, boolean hasError) {
HistoricProcessInstance historicInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
@@ -317,23 +417,46 @@ public class ProcessService {
throw new RuntimeException("流程实例不存在: " + processInstanceId);
}
boolean isRunning = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.count() > 0;
ProcessInstanceInfo info = new ProcessInstanceInfo();
info.setProcessInstanceId(historicInstance.getId());
info.setProcessDefinitionId(historicInstance.getProcessDefinitionId());
info.setStartTime(historicInstance.getStartTime());
info.setEndTime(historicInstance.getEndTime()); // 可能为 null
info.setEndTime(historicInstance.getEndTime());
// 计算耗时
Long duration = historicInstance.getDurationInMillis();
if (duration == null && historicInstance.getStartTime() != null && isRunning) {
duration = System.currentTimeMillis() - historicInstance.getStartTime().getTime();
}
info.setDurationInMillis(duration);
info.setDurationFormatted(duration != null ? formatDuration(duration) : null);
info.setStatus(isRunning ? "running" : "completed");
// 【优化点】:设置流程状态
// 优先级Error > Suspended > Running > Completed
String status;
if (isRunning) {
// --- 运行中 ---
if (hasError) {
status = "error"; // 有死信作业,视为异常
} else if (isSuspended) {
status = "suspended"; // 被挂起
} else {
status = "running"; // 正常运行
}
} else {
// --- 已结束 (运行时查不到,历史表里有) ---
String deleteReason = historicInstance.getDeleteReason();
if (deleteReason == null) {
// 1. 正常走完结束节点deleteReason 为空
status = "completed";
} else {
// 2. 有删除原因,说明是被取消或强制终止的
// 你可以根据 reason 的内容做更细的区分,或者统称为 cancelled
status = "cancelled";
}
}
info.setStatus(status);
return info;
}
@@ -507,11 +630,12 @@ public class ProcessService {
return SdmResponse.success();
}
public void asyncCallback(AsyncCallbackRequest request) {
public SdmResponse asyncCallback(AsyncCallbackRequest request) {
log.info("接收到异步回调请求, 请求内容: {}", request);
// 发送信号唤醒流程实例中等待的节点
universalDelegate.signalByTaskId(request);
log.info("异步回调处理转发完成, 任务ID: {}", request.getAsyncTaskId());
return SdmResponse.success();
}
/**
@@ -552,13 +676,14 @@ public class ProcessService {
.orElseThrow(() -> new RuntimeException("在死信队列中未找到节点 [" + activityId + "] 的作业"));
}
public void retryFailedNode( String processInstanceId,String failNodeId) {
public SdmResponse retryFailedNode( String processInstanceId,String failNodeId) {
try {
// 2. 查找 Job ID (参考上面的代码)
String jobId = findDeadJobId(processInstanceId, failNodeId);
// 3. 执行重试
managementService.moveDeadLetterJobToExecutableJob(jobId, 3);
managementService.moveDeadLetterJobToExecutableJob(jobId, 1);
log.info("作业已恢复,等待异步执行器拾取执行...");
return SdmResponse.success("重试任务已提交");
} catch (Exception e) {
log.error("重试节点失败, 流程ID: {}, 节点ID: {}, 异常信息: {}", processInstanceId, failNodeId, e.getMessage(), e);
throw new RuntimeException("重试失败");
@@ -572,7 +697,7 @@ public class ProcessService {
* @param targetNodeId 目标节点ID
* @param newVariables 新的变量参数
*/
public void retryToNode(String procInstId, String targetNodeId, Map<String, Object> newVariables) {
public SdmResponse retryToNode(String procInstId, String targetNodeId, Map<String, Object> newVariables) {
log.info("开始重试节点操作, 流程实例ID: {}, 目标节点ID: {}, 新变量数量: {}",
procInstId, targetNodeId, newVariables != null ? newVariables.size() : 0);
@@ -605,5 +730,6 @@ public class ProcessService {
// 完成重试中转任务,触发 ExecutionListener
taskService.complete(retryTask.getId());
log.info("重试中转任务完成, 任务ID: {}", retryTask.getId());
return SdmResponse.success("重试任务已提交");
}
}

View File

@@ -0,0 +1,40 @@
server:
port: 7106
spring:
application:
name: flowable
datasource:
url: jdbc:mysql://192.168.30.146:3306/flowable?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
flowable:
# ?????????
database-schema-update: true
# ??????JOB
async-executor-activate: true
cloud:
nacos:
discovery:
server-addr: 192.168.30.146:8848
group: PROD_GROUP
enabled: true
logging:
level:
org:
flowable: INFO
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
type-aliases-package: com.sdm.flowable.model.entity
configuration:
map-underscore-to-camel-case: true
global-config:
db-config:
id-type: auto
security:
whitelist:
paths:
- /process/testHpc
- /process/asyncCallback

View File

@@ -42,4 +42,4 @@ fi
# 启动项目
echo "正在启动项目..."
nohup java ${JVM_OPTS} -Dspring.profiles.active=dev -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5006 -jar "${FULL_JAR_PATH}" > "${LOG_FILE}" 2>&1 &
nohup java ${JVM_OPTS} -Dspring.profiles.active=dev -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5003 -jar "${FULL_JAR_PATH}" > "${LOG_FILE}" 2>&1 &

View File

@@ -0,0 +1,45 @@
#!/bin/bash
# Spring Boot 项目启动脚本
JAR_PATH="/home/app/flowable"
JAR_NAME="flowable-0.0.1-SNAPSHOT.jar"
FULL_JAR_PATH="${JAR_PATH}/${JAR_NAME}"
# 与logback.xml保持一致的日志路径
LOG_HOME="/home/app/flowable/logs"
LOG_FILE="${LOG_HOME}/running.log"
# JVM参数
JVM_OPTS="-Xms512m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOG_HOME}/heapdump.hprof"
# 函数定义
check_jar_exists() {
if [ ! -f "${FULL_JAR_PATH}" ]; then
echo "ERROR: Jar包不存在路径${FULL_JAR_PATH}"
exit 1
fi
}
get_running_pid() {
ps -ef | grep "${JAR_NAME}" | grep -v "grep" | awk '{print $2}'
}
# 检查是否已运行
PID=$(get_running_pid)
if [ -n "${PID}" ]; then
echo "项目已在运行中PID: ${PID}"
exit 0
fi
# 检查Jar包是否存在
check_jar_exists
# 确保日志目录存在
if [ ! -d "${LOG_HOME}" ]; then
mkdir -p "${LOG_HOME}"
echo "日志目录不存在,已自动创建:${LOG_HOME}"
fi
# 启动项目
echo "正在启动项目..."
nohup java ${JVM_OPTS} -Dspring.profiles.active=prod -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5003 -jar "${FULL_JAR_PATH}" > "${LOG_FILE}" 2>&1 &