Merge remote-tracking branch 'origin/main'
This commit is contained in:
@@ -33,4 +33,9 @@ public abstract class BaseExecuteConfig {
|
||||
private boolean asyncCallback = false;
|
||||
// 用于标记回调节点ID,当asyncCallback为true时,表示当前执行节点是异步执行,执行完成后需要回调的节点ID,一般就是receiveTaskId
|
||||
private String callbackNodeId;
|
||||
|
||||
// 是否需要等待用户手动提交执行,
|
||||
// 默认是false,表示不需要等待用户手动提交执行
|
||||
// true: 流程到此停止,创建人工输入任务,等待用户点击执行
|
||||
private boolean waitUser = false;
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package com.sdm.flowable.controller;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.sdm.common.common.SdmResponse;
|
||||
import com.sdm.flowable.delegate.UniversalDelegate;
|
||||
import com.sdm.flowable.delegate.handler.HpcHandler;
|
||||
import com.sdm.flowable.dto.ProcessDefinitionDTO;
|
||||
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
|
||||
@@ -30,9 +29,6 @@ public class ProcessController {
|
||||
@Autowired
|
||||
private IProcessNodeParamService processNodeParamService;
|
||||
|
||||
@Autowired
|
||||
private UniversalDelegate universalDelegate;
|
||||
|
||||
@Autowired
|
||||
private HpcHandler hpcHandler;
|
||||
|
||||
@@ -137,8 +133,8 @@ public class ProcessController {
|
||||
* 根据流程 key 和指定版本获取版本流程定义的节点信息
|
||||
*/
|
||||
@GetMapping("/listNodesByProcessDefinitionKey")
|
||||
public List<Map<String, Object>> listNodesByProcessDefinitionKey(@RequestParam String processDefinitionKey,@RequestParam(required = false)Integer processDefinitionVersion) {
|
||||
return processService.listNodesByProcessDefinitionKey(processDefinitionKey,processDefinitionVersion);
|
||||
public List<Map<String, Object>> listNodesByProcessDefinitionKey(@RequestParam String processDefinitionKey, @RequestParam(required = false) Integer processDefinitionVersion) {
|
||||
return processService.listNodesByProcessDefinitionKey(processDefinitionKey, processDefinitionVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -152,8 +148,6 @@ public class ProcessController {
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 删除所有流程部署
|
||||
*/
|
||||
@@ -239,14 +233,14 @@ public class ProcessController {
|
||||
}
|
||||
|
||||
/**
|
||||
* 完成人工节点任务
|
||||
* 流程节点继续执行(完成人工节点/或者等待用户输入后继续手动执行的节点)
|
||||
*
|
||||
* @param req
|
||||
* @return
|
||||
*/
|
||||
@PostMapping("/completeManualTasks")
|
||||
public void completeManualTasks(@RequestBody CompleteTaskReq req) {
|
||||
processService.completeManualTasks(req);
|
||||
@PostMapping("/continueServiceTask")
|
||||
public void continueServiceTask(@RequestBody CompleteTaskReq req) {
|
||||
processService.continueServiceTask(req);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -257,6 +251,6 @@ public class ProcessController {
|
||||
@PostMapping("/asyncCallback")
|
||||
public void asyncCallback(@RequestBody AsyncCallbackRequest request) {
|
||||
// 发送信号唤醒流程实例中等待的节点
|
||||
universalDelegate.signalByTaskId(request);
|
||||
processService.asyncCallback(request);
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,7 @@ import com.sdm.flowable.service.IProcessNodeParamService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.flowable.engine.RuntimeService;
|
||||
import org.flowable.engine.runtime.Execution;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@@ -27,7 +27,7 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Autowired
|
||||
private IProcessNodeParamService paramService;
|
||||
private IProcessNodeParamService processNodeParamService;
|
||||
|
||||
@Autowired
|
||||
private IAsyncTaskRecordService asyncTaskRecordService;
|
||||
@@ -44,7 +44,7 @@ public class UniversalDelegate implements JavaDelegate {
|
||||
String nodeName = execution.getCurrentFlowElement().getName();
|
||||
|
||||
// 2. 读取输入参数
|
||||
Map<String, Object> params = paramService.getParam(procInstId, nodeId);
|
||||
Map<String, Object> params = processNodeParamService.getParam(procInstId, nodeId);
|
||||
|
||||
log.info("==== 节点执行日志 ====\n流程实例ID:{}\n节点ID:{}\n节点名称:{}\n输入参数:{}\n====================",
|
||||
procInstId, nodeId, nodeName, params);
|
||||
|
||||
@@ -10,4 +10,12 @@ public class CompleteTaskReq {
|
||||
private String processInstanceId;
|
||||
private String taskDefinitionKey;
|
||||
private Map<String, Object> variables = new HashMap<>();
|
||||
/**
|
||||
* 任务类型:FlowElementTypeEnums
|
||||
* userTask - 普通用户任务
|
||||
* serviceTask - ServiceTask前置隐藏等待任务
|
||||
*/
|
||||
private String flowelementType;
|
||||
|
||||
|
||||
}
|
||||
@@ -1,10 +1,15 @@
|
||||
package com.sdm.flowable.process;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.sdm.flowable.delegate.UniversalDelegate;
|
||||
import com.sdm.flowable.dto.ProcessDefinitionDTO;
|
||||
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
|
||||
import com.sdm.flowable.enums.FlowElementTypeEnums;
|
||||
import com.sdm.flowable.util.Dto2BpmnConverter;
|
||||
import com.sdm.flowable.constants.FlowableConfig;
|
||||
import com.sdm.flowable.dto.req.CompleteTaskReq;
|
||||
import com.sdm.flowable.util.FlowNodeIdUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.bpmn.model.*;
|
||||
import org.flowable.bpmn.model.Process;
|
||||
import org.flowable.engine.HistoryService;
|
||||
@@ -24,12 +29,12 @@ import org.flowable.validation.ValidationError;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class ProcessService {
|
||||
@Autowired
|
||||
@@ -47,6 +52,9 @@ public class ProcessService {
|
||||
@Autowired
|
||||
private Dto2BpmnConverter dto2BpmnConverter;
|
||||
|
||||
@Autowired
|
||||
private UniversalDelegate universalDelegate;
|
||||
|
||||
// 部署流程(前端传入Flowable标准JSON)
|
||||
public Deployment deploy(ProcessDefinitionDTO processDTO) throws Exception {
|
||||
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
|
||||
@@ -493,17 +501,36 @@ public class ProcessService {
|
||||
return result;
|
||||
}
|
||||
|
||||
public void completeManualTasks(@RequestBody CompleteTaskReq req) {
|
||||
Task task = taskService.createTaskQuery()
|
||||
.processInstanceId(req.getProcessInstanceId())
|
||||
.taskDefinitionKey(req.getTaskDefinitionKey())
|
||||
.singleResult();
|
||||
public void continueServiceTask(@RequestBody CompleteTaskReq req) {
|
||||
String taskDefKey;
|
||||
|
||||
if (task != null) {
|
||||
taskService.complete(task.getId(), req.getVariables());
|
||||
// 根据类型确定真正的 taskDefinitionKey
|
||||
if (FlowElementTypeEnums.fromString(req.getFlowelementType()).equals(FlowElementTypeEnums.SERVICETASK)) {
|
||||
// 如果是 ServiceTask 前置等待节点
|
||||
taskDefKey = FlowNodeIdUtils.generateWaitUserTaskId(req.getTaskDefinitionKey());
|
||||
} else {
|
||||
throw new RuntimeException("找不到任务!");
|
||||
// 普通 UserTask
|
||||
taskDefKey = req.getTaskDefinitionKey();
|
||||
}
|
||||
|
||||
Task task = taskService.createTaskQuery()
|
||||
.processInstanceId(req.getProcessInstanceId())
|
||||
.taskDefinitionKey(taskDefKey)
|
||||
.singleResult();
|
||||
|
||||
if (task == null) {
|
||||
throw new RuntimeException("找不到任务! taskDefinitionKey=" + taskDefKey);
|
||||
}
|
||||
|
||||
// 完成任务
|
||||
if (req.getVariables() != null) {
|
||||
taskService.complete(task.getId(), req.getVariables());
|
||||
} else {
|
||||
taskService.complete(task.getId());
|
||||
}
|
||||
}
|
||||
public void asyncCallback(AsyncCallbackRequest request) {
|
||||
// 发送信号唤醒流程实例中等待的节点
|
||||
universalDelegate.signalByTaskId(request);
|
||||
}
|
||||
}
|
||||
@@ -46,9 +46,12 @@ public class Dto2BpmnConverter {
|
||||
.filter(e -> FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 3. 存储异步任务映射关系(原节点ID → wait节点ID)
|
||||
Map<String, String> asyncTaskMap = new HashMap<>(); // 异步任务映射(原节点→wait节点)
|
||||
|
||||
// 3. 存储异步任务ReceiveTask 映射关系(原节点ID → ReceiveTask节点ID)
|
||||
Map<String, String> asyncTaskMap = new HashMap<>(); // 异步任务映射(原节点→ReceiveTask节点)
|
||||
// 3.1、存储等待用户输入任务映射关系(原节点ID → waitUserTask节点ID)
|
||||
Map<String, String> waitUserTaskMap = new HashMap<>(); // 原节点ID → waitUserTask节点ID
|
||||
|
||||
|
||||
// 4. 存储并行网关映射关系(原节点ID → 网关ID)
|
||||
Map<String, String> splitGatewayMap = new HashMap<>(); // 拆分网关(原拆分节点→拆分网关)
|
||||
Map<String, String> joinGatewayMap = new HashMap<>(); // 汇总网关(原汇总节点→汇总网关)
|
||||
@@ -57,14 +60,16 @@ public class Dto2BpmnConverter {
|
||||
for (FlowElementDTO nodeDto : nodeDtos) {
|
||||
// 处理异步任务,创建等待节点,放在穿件实际节点之前是为了构造asyncTaskMap,后面createActualNode的时候才能设置回调等待节点
|
||||
handleAsyncTasks(process, nodeDto, asyncTaskMap);
|
||||
// 处理等待用户提交任务
|
||||
handleWaitUserTask(process, nodeDto, waitUserTaskMap);
|
||||
// 创建实际节点
|
||||
createActualNode(process, nodeDto,asyncTaskMap);
|
||||
createActualNode(process, nodeDto, asyncTaskMap);
|
||||
// 处理并行网关,创建拆分和汇聚节点
|
||||
addRequiredGateways(process, nodeDto, flowDtos, joinGatewayMap, splitGatewayMap);
|
||||
}
|
||||
|
||||
// 6. 创建连线
|
||||
createConnections(process, flowDtos, asyncTaskMap, joinGatewayMap, splitGatewayMap);
|
||||
createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap);
|
||||
|
||||
return bpmnModel;
|
||||
}
|
||||
@@ -75,24 +80,44 @@ public class Dto2BpmnConverter {
|
||||
private void handleAsyncTasks(Process process, FlowElementDTO nodeDto, Map<String, String> asyncTaskMap) {
|
||||
// 检查节点是否为服务任务或用户任务且标记为异步回调
|
||||
if ((FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) ||
|
||||
FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) &&
|
||||
nodeDto.getExtensionElements() != null &&
|
||||
nodeDto.getExtensionElements().getExecuteConfig() != null &&
|
||||
nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback()) {
|
||||
|
||||
FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) &&
|
||||
nodeDto.getExtensionElements() != null &&
|
||||
nodeDto.getExtensionElements().getExecuteConfig() != null &&
|
||||
nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback()) {
|
||||
|
||||
// 创建接收任务节点
|
||||
String originalNodeId = nodeDto.getId();
|
||||
String waitNodeId = originalNodeId + "_wait";
|
||||
String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId);
|
||||
ReceiveTask receiveTask = new ReceiveTask();
|
||||
receiveTask.setId(waitNodeId);
|
||||
receiveTask.setName(nodeDto.getName() + "等待结果");
|
||||
process.addFlowElement(receiveTask);
|
||||
|
||||
|
||||
// 记录映射关系
|
||||
asyncTaskMap.put(originalNodeId, waitNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map<String, String> waitUserTaskMap) {
|
||||
// 只有当前节点是ServiceTask才需要判断是否等待用户输入,需要才创建前置UserTask
|
||||
if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) &&
|
||||
nodeDto.getExtensionElements() != null &&
|
||||
nodeDto.getExtensionElements().getExecuteConfig() != null &&
|
||||
nodeDto.getExtensionElements().getExecuteConfig().isWaitUser()) {
|
||||
String originalNodeId = nodeDto.getId();
|
||||
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
|
||||
|
||||
UserTask waitUserTask = new UserTask();
|
||||
waitUserTask.setId(waitUserId);
|
||||
waitUserTask.setName(nodeDto.getName() + "等待用户提交");
|
||||
// 不设置assignee,让任何人可以处理
|
||||
process.addFlowElement(waitUserTask);
|
||||
|
||||
// 记录映射
|
||||
waitUserTaskMap.put(originalNodeId, waitUserId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加必要的网关(并行拆分网关和并行汇聚网关)
|
||||
*/
|
||||
@@ -107,7 +132,7 @@ public class Dto2BpmnConverter {
|
||||
// 检查是否需要添加汇聚网关(入度>1)
|
||||
if (incomingCount > 1) {
|
||||
// 如果入度>1,则在节点前插入汇聚网关
|
||||
String joinGatewayId = "join_gw_" + nodeId;
|
||||
String joinGatewayId = FlowNodeIdUtils.generateJoinGatewayId(nodeId);
|
||||
ParallelGateway joinGateway = new ParallelGateway();
|
||||
joinGateway.setId(joinGatewayId);
|
||||
joinGateway.setName("并行汇聚-" + nodeDto.getName());
|
||||
@@ -118,7 +143,7 @@ public class Dto2BpmnConverter {
|
||||
// 检查是否需要添加拆分网关(出度>1)
|
||||
if (outgoingCount > 1) {
|
||||
// 如果出度>1,则在节点后插入拆分网关
|
||||
String splitGatewayId = "split_gw_" + nodeId;
|
||||
String splitGatewayId = FlowNodeIdUtils.generateSplitGatewayId(nodeId);
|
||||
ParallelGateway splitGateway = new ParallelGateway();
|
||||
splitGateway.setId(splitGatewayId);
|
||||
splitGateway.setName("并行拆分-" + nodeDto.getName());
|
||||
@@ -133,6 +158,7 @@ public class Dto2BpmnConverter {
|
||||
private void createConnections(Process process,
|
||||
List<FlowElementDTO> flowDtos,
|
||||
Map<String, String> asyncTaskMap,
|
||||
Map<String, String> waitUserTaskMap,
|
||||
Map<String, String> joinGatewayMap,
|
||||
Map<String, String> splitGatewayMap) {
|
||||
|
||||
@@ -148,9 +174,15 @@ public class Dto2BpmnConverter {
|
||||
|
||||
// ====================================================================================
|
||||
// ③ 第三阶段:处理异步任务(等待节点)
|
||||
// 原逻辑:原节点 → wait → 原本下游
|
||||
// 原节点 → wait → 原本下游
|
||||
// ====================================================================================
|
||||
handleAsyncTaskConnections(process, asyncTaskMap);
|
||||
|
||||
// ====================================================================================
|
||||
// ④ 第三阶段:处理等待用户提交任务
|
||||
// 原节点 → waitUserTask → 原节点
|
||||
// ====================================================================================
|
||||
handleWaitUserTaskConnections(process, waitUserTaskMap);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -273,6 +305,35 @@ public class Dto2BpmnConverter {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleWaitUserTaskConnections(Process process, Map<String, String> waitUserTaskMap) {
|
||||
for (String originalNodeId : waitUserTaskMap.keySet()) {
|
||||
String waitUserId = waitUserTaskMap.get(originalNodeId);
|
||||
|
||||
// Step 1: 找出原节点的所有入线,改为指向 waitUserTask
|
||||
List<SequenceFlow> removeLines = new ArrayList<>();
|
||||
List<String> originalSources = new ArrayList<>();
|
||||
|
||||
for (FlowElement ele : process.getFlowElements()) {
|
||||
if (ele instanceof SequenceFlow sf) {
|
||||
if (sf.getTargetRef().equals(originalNodeId)) {
|
||||
originalSources.add(sf.getSourceRef());
|
||||
removeLines.add(sf);
|
||||
}
|
||||
}
|
||||
}
|
||||
removeLines.forEach(f -> process.removeFlowElement(f.getId()));
|
||||
|
||||
// Step 2: 添加原来的入线 → waitUserTask
|
||||
for (String src : originalSources) {
|
||||
process.addFlowElement(createSequenceFlow(src, waitUserId, null));
|
||||
}
|
||||
|
||||
// Step 3: waitUserTask → 原节点
|
||||
process.addFlowElement(createSequenceFlow(waitUserId, originalNodeId, null));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 创建实际的流程节点
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
package com.sdm.flowable.util;
|
||||
|
||||
public class FlowNodeIdUtils {
|
||||
private static final String JOIN_GATEWAY_PREFIX = "join_gw_";
|
||||
private static final String SPLIT_GATEWAY_PREFIX = "split_gw_";
|
||||
private static final String ASYNC_TASK_SUFFIX = "_wait";
|
||||
private static final String WAIT_USER_SUFFIX = "_waitUser";
|
||||
|
||||
// ==================== 网关 ====================
|
||||
|
||||
public static String generateJoinGatewayId(String nodeId) {
|
||||
return JOIN_GATEWAY_PREFIX + nodeId;
|
||||
}
|
||||
|
||||
public static String generateSplitGatewayId(String nodeId) {
|
||||
return SPLIT_GATEWAY_PREFIX + nodeId;
|
||||
}
|
||||
|
||||
public static boolean isJoinGateway(String id) {
|
||||
return id != null && id.startsWith(JOIN_GATEWAY_PREFIX);
|
||||
}
|
||||
|
||||
public static boolean isSplitGateway(String id) {
|
||||
return id != null && id.startsWith(SPLIT_GATEWAY_PREFIX);
|
||||
}
|
||||
|
||||
// ==================== 异步接收任务 ====================
|
||||
|
||||
public static String generateAsyncTaskId(String nodeId) {
|
||||
return nodeId + ASYNC_TASK_SUFFIX;
|
||||
}
|
||||
|
||||
public static boolean isAsyncTask(String id) {
|
||||
return id != null && id.endsWith(ASYNC_TASK_SUFFIX);
|
||||
}
|
||||
|
||||
public static String getOriginalNodeIdFromAsyncTask(String asyncTaskId) {
|
||||
if (!isAsyncTask(asyncTaskId)) {
|
||||
throw new IllegalArgumentException("不是异步等待节点: " + asyncTaskId);
|
||||
}
|
||||
return asyncTaskId.substring(0, asyncTaskId.length() - ASYNC_TASK_SUFFIX.length());
|
||||
}
|
||||
|
||||
// ==================== 用户等待任务 ====================
|
||||
|
||||
public static String generateWaitUserTaskId(String nodeId) {
|
||||
return nodeId + WAIT_USER_SUFFIX;
|
||||
}
|
||||
|
||||
public static boolean isWaitUserTask(String id) {
|
||||
return id != null && id.endsWith(WAIT_USER_SUFFIX);
|
||||
}
|
||||
|
||||
public static String getOriginalNodeIdFromWaitUserTask(String waitUserTaskId) {
|
||||
if (!isWaitUserTask(waitUserTaskId)) {
|
||||
throw new IllegalArgumentException("不是隐藏等待节点: " + waitUserTaskId);
|
||||
}
|
||||
return waitUserTaskId.substring(0, waitUserTaskId.length() - WAIT_USER_SUFFIX.length());
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
|
||||
@@ -16,196 +17,177 @@ import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 网关请求响应日志记录过滤器
|
||||
* 集成了TraceId生成与传递功能
|
||||
* 使用 DataBufferUtils.join() 解决多次拷贝问题
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RequestResponseLoggingFilter implements GlobalFilter, Ordered {
|
||||
|
||||
/**
|
||||
* MDC 中存储 traceId 的 key(需与日志格式中的 %X{traceId} 对应)
|
||||
*/
|
||||
public static final String TRACE_ID_KEY = "traceId";
|
||||
|
||||
/**
|
||||
* 请求头/响应头中传递 traceId 的 key
|
||||
*/
|
||||
public static final String TRACE_ID_KEY = "traceId";
|
||||
public static final String TRACE_ID_HEADER = "X-Trace-Id";
|
||||
|
||||
/** 最大记录的响应体长度(防止大 JSON) */
|
||||
private static final int MAX_BODY_LENGTH = 10 * 1024;
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
// 生成或获取 traceId(优先从请求头获取,支持前端传递,便于联调)
|
||||
String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);
|
||||
if (traceId == null || traceId.trim().isEmpty()) {
|
||||
// 生成 UUID 并去除横杠(32位,简洁易读)
|
||||
traceId = UUID.randomUUID().toString().replace("-", "");
|
||||
}
|
||||
|
||||
// 存入 MDC(供日志打印使用,所有日志框架可通过 %X{traceId} 获取)
|
||||
|
||||
String traceId = getOrCreateTraceId(exchange);
|
||||
ServerWebExchange tracedExchange = mutateExchangeWithTraceId(exchange, traceId);
|
||||
|
||||
MDC.put(TRACE_ID_KEY, traceId);
|
||||
|
||||
// 写入响应头(返回给前端,便于前端排查问题时匹配日志)
|
||||
// 只在响应头中还没有X-Trace-Id时才设置,避免与下游服务设置的TraceId重复
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
if (!response.getHeaders().containsKey(TRACE_ID_HEADER)) {
|
||||
response.getHeaders().set(TRACE_ID_HEADER, traceId);
|
||||
}
|
||||
|
||||
// 构建带有 traceId 的新请求
|
||||
ServerHttpRequest request = exchange.getRequest().mutate()
|
||||
.header(TRACE_ID_HEADER, traceId)
|
||||
.build();
|
||||
|
||||
ServerHttpRequest finalRequest = request;
|
||||
String method = request.getMethod().toString();
|
||||
String path = request.getURI().getPath();
|
||||
String query = request.getURI().getQuery();
|
||||
String clientIp = getClientIp(request);
|
||||
|
||||
// 记录请求详细信息
|
||||
StringBuilder requestLog = new StringBuilder();
|
||||
requestLog.append("\n==================== 网关接收到请求 ====================\n");
|
||||
requestLog.append("TraceId: ").append(traceId).append("\n");
|
||||
requestLog.append("请求方法: ").append(method).append("\n");
|
||||
requestLog.append("请求路径: ").append(path).append("\n");
|
||||
requestLog.append("请求参数: ").append(query != null ? query : "").append("\n");
|
||||
requestLog.append("客户端IP: ").append(clientIp).append("\n");
|
||||
|
||||
// 记录请求头
|
||||
requestLog.append("请求头:\n");
|
||||
for (Map.Entry<String, List<String>> header : request.getHeaders().entrySet()) {
|
||||
requestLog.append(" ").append(header.getKey()).append(": ").append(header.getValue()).append("\n");
|
||||
}
|
||||
|
||||
requestLog.append("========================================================");
|
||||
log.info(requestLog.toString());
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// 包装响应以捕获响应数据
|
||||
DataBufferFactory bufferFactory = response.bufferFactory();
|
||||
String finalTraceId = traceId;
|
||||
String finalTraceId1 = traceId;
|
||||
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
|
||||
@Override
|
||||
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
|
||||
if (body instanceof Flux) {
|
||||
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
|
||||
return super.writeWith(fluxBody.map(dataBuffer -> {
|
||||
// 记录响应信息
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
|
||||
StringBuilder responseLog = new StringBuilder();
|
||||
responseLog.append("\n==================== 网关响应信息 ====================\n");
|
||||
responseLog.append("TraceId: ").append(finalTraceId).append("\n");
|
||||
responseLog.append("请求方法: ").append(method).append("\n");
|
||||
responseLog.append("请求路径: ").append(path).append("\n");
|
||||
responseLog.append("响应状态: ").append(getStatusCode()).append("\n");
|
||||
responseLog.append("处理时间: ").append(duration).append("ms\n");
|
||||
|
||||
// 记录响应头
|
||||
responseLog.append("响应头:\n");
|
||||
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
|
||||
responseLog.append(" ").append(header.getKey()).append(": ").append(String.join(",", header.getValue())).append("\n");
|
||||
}
|
||||
|
||||
// 注意:记录响应体可能会影响性能,特别是对于大文件
|
||||
// 如果需要记录响应体,可以取消下面的注释,但要注意性能影响
|
||||
byte[] content = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(content);
|
||||
String responseBody = new String(content, StandardCharsets.UTF_8);
|
||||
DataBuffer newBuffer = bufferFactory.wrap(content);
|
||||
responseLog.append("响应体: ").append(responseBody).append("\n");
|
||||
|
||||
responseLog.append("========================================================");
|
||||
log.info(responseLog.toString());
|
||||
return newBuffer;
|
||||
}));
|
||||
}
|
||||
return super.writeWith(body);
|
||||
}
|
||||
|
||||
// 处理零字节响应的情况
|
||||
@Override
|
||||
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
|
||||
StringBuilder responseLog = new StringBuilder();
|
||||
responseLog.append("\n==================== 网关响应信息 ====================\n");
|
||||
responseLog.append("TraceId: ").append(finalTraceId1).append("\n");
|
||||
responseLog.append("请求方法: ").append(method).append("\n");
|
||||
responseLog.append("请求路径: ").append(path).append("\n");
|
||||
responseLog.append("响应状态: ").append(getStatusCode()).append("\n");
|
||||
responseLog.append("处理时间: ").append(duration).append("ms\n");
|
||||
|
||||
// 记录响应头
|
||||
responseLog.append("响应头:\n");
|
||||
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
|
||||
responseLog.append(" ").append(header.getKey()).append(": ").append(header.getValue()).append("\n");
|
||||
}
|
||||
|
||||
logRequest(tracedExchange, traceId);
|
||||
|
||||
responseLog.append("========================================================");
|
||||
log.info(responseLog.toString());
|
||||
|
||||
return super.writeAndFlushWith(body);
|
||||
}
|
||||
};
|
||||
ServerHttpResponseDecorator responseDecorator =
|
||||
decorateResponse(tracedExchange, traceId, startTime);
|
||||
|
||||
// 将装饰后的响应和带 traceId 的请求替换到 exchange 中
|
||||
return chain.filter(exchange.mutate()
|
||||
.request(finalRequest)
|
||||
.response(decoratedResponse)
|
||||
return chain.filter(tracedExchange.mutate()
|
||||
.response(responseDecorator)
|
||||
.build())
|
||||
.doFinally(signalType -> {
|
||||
// 清除 MDC 中的 traceId(关键!避免线程池复用导致的 traceId 污染)
|
||||
MDC.remove(TRACE_ID_KEY);
|
||||
});
|
||||
}
|
||||
|
||||
private String getClientIp(ServerHttpRequest request) {
|
||||
String xForwardedFor = request.getHeaders().getFirst("X-Forwarded-For");
|
||||
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
|
||||
return xForwardedFor.split(",")[0].trim();
|
||||
}
|
||||
|
||||
String xRealIp = request.getHeaders().getFirst("X-Real-IP");
|
||||
if (xRealIp != null && !xRealIp.isEmpty()) {
|
||||
return xRealIp;
|
||||
}
|
||||
|
||||
// 检查Origin头部
|
||||
String origin = request.getHeaders().getFirst("Origin");
|
||||
if (origin != null && !origin.isEmpty()) {
|
||||
// Origin格式为 http://domain:port 或 https://domain:port
|
||||
try {
|
||||
String[] parts = origin.split("://");
|
||||
if (parts.length > 1) {
|
||||
String hostPort = parts[1];
|
||||
String[] hostPortParts = hostPort.split(":");
|
||||
return hostPortParts[0];
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 解析失败则继续使用其他方式获取IP
|
||||
}
|
||||
}
|
||||
|
||||
InetSocketAddress remoteAddress = request.getRemoteAddress();
|
||||
return remoteAddress != null ? remoteAddress.getAddress().getHostAddress() : "unknown";
|
||||
.doFinally(s -> MDC.remove(TRACE_ID_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE + 1; // 确保在大多数其他过滤器之前执行
|
||||
return Ordered.HIGHEST_PRECEDENCE + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// ================= TraceId =================
|
||||
|
||||
private String getOrCreateTraceId(ServerWebExchange exchange) {
|
||||
String traceId = exchange.getRequest().getHeaders().getFirst(TRACE_ID_HEADER);
|
||||
return (traceId == null || traceId.isBlank())
|
||||
? UUID.randomUUID().toString().replace("-", "")
|
||||
: traceId;
|
||||
}
|
||||
|
||||
private ServerWebExchange mutateExchangeWithTraceId(ServerWebExchange exchange, String traceId) {
|
||||
|
||||
exchange.getResponse().getHeaders().set(TRACE_ID_HEADER, traceId);
|
||||
|
||||
ServerHttpRequest request = exchange.getRequest().mutate()
|
||||
.header(TRACE_ID_HEADER, traceId)
|
||||
.build();
|
||||
|
||||
return exchange.mutate().request(request).build();
|
||||
}
|
||||
|
||||
// ================= Request =================
|
||||
|
||||
private void logRequest(ServerWebExchange exchange, String traceId) {
|
||||
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
|
||||
log.info("""
|
||||
==================== 网关请求 ====================
|
||||
TraceId: {}
|
||||
Method: {}
|
||||
Path: {}
|
||||
Query: {}
|
||||
ClientIp: {}
|
||||
Headers: {}
|
||||
=================================================
|
||||
""",
|
||||
traceId,
|
||||
request.getMethod(),
|
||||
request.getURI().getPath(),
|
||||
request.getURI().getQuery(),
|
||||
getClientIp(request),
|
||||
request.getHeaders()
|
||||
);
|
||||
}
|
||||
|
||||
// ================= Response =================
|
||||
|
||||
private ServerHttpResponseDecorator decorateResponse(
|
||||
ServerWebExchange exchange,
|
||||
String traceId,
|
||||
long startTime) {
|
||||
|
||||
ServerHttpResponse originalResponse = exchange.getResponse();
|
||||
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
|
||||
|
||||
return new ServerHttpResponseDecorator(originalResponse) {
|
||||
|
||||
@Override
|
||||
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
|
||||
|
||||
if (!(body instanceof Flux)) {
|
||||
return super.writeWith(body);
|
||||
}
|
||||
Flux<DataBuffer> fluxBody = (Flux<DataBuffer>) body;
|
||||
return DataBufferUtils.join(fluxBody)
|
||||
.flatMap(joinedBuffer -> {
|
||||
byte[] content = null;
|
||||
try {
|
||||
int length = Math.min(
|
||||
joinedBuffer.readableByteCount(),
|
||||
MAX_BODY_LENGTH
|
||||
);
|
||||
content = new byte[length];
|
||||
joinedBuffer.read(content);
|
||||
|
||||
logResponse(exchange, traceId, startTime, content);
|
||||
// 必须返回完整响应内容给客户端
|
||||
DataBuffer newBuffer = bufferFactory.wrap(
|
||||
joinedBuffer.asByteBuffer()
|
||||
);
|
||||
return super.writeWith(Mono.just(newBuffer));
|
||||
} finally {
|
||||
DataBufferUtils.release(joinedBuffer);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void logResponse(
|
||||
ServerWebExchange exchange,
|
||||
String traceId,
|
||||
long startTime,
|
||||
byte[] content) {
|
||||
|
||||
long cost = System.currentTimeMillis() - startTime;
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
|
||||
String responseBody = content == null
|
||||
? ""
|
||||
: new String(content, StandardCharsets.UTF_8);
|
||||
|
||||
log.info("""
|
||||
==================== 网关响应 ====================
|
||||
TraceId: {}
|
||||
Status: {}
|
||||
Cost: {} ms
|
||||
Headers: {}
|
||||
Body: {}
|
||||
=================================================
|
||||
""",
|
||||
traceId,
|
||||
response.getStatusCode(),
|
||||
cost,
|
||||
response.getHeaders(),
|
||||
responseBody
|
||||
);
|
||||
}
|
||||
|
||||
// ================= Util =================
|
||||
|
||||
private String getClientIp(ServerHttpRequest request) {
|
||||
String xff = request.getHeaders().getFirst("X-Forwarded-For");
|
||||
if (xff != null && !xff.isBlank()) {
|
||||
return xff.split(",")[0].trim();
|
||||
}
|
||||
return Optional.ofNullable(request.getRemoteAddress())
|
||||
.map(addr -> addr.getAddress().getHostAddress())
|
||||
.orElse("unknown");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ LOG_HOME="/home/app/gateway2/logs"
|
||||
LOG_FILE="${LOG_HOME}/running.log"
|
||||
|
||||
# JVM参数
|
||||
JVM_OPTS="-Xms512m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOG_HOME}/heapdump.hprof"
|
||||
JVM_OPTS="-Xmx2g -Xms2g -XX:MaxDirectMemorySize=2g -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOG_HOME}/heapdump.hprof"
|
||||
|
||||
|
||||
# 函数定义
|
||||
|
||||
Reference in New Issue
Block a user