This commit is contained in:
2026-02-02 09:01:03 +08:00
22 changed files with 302 additions and 128 deletions

View File

@@ -3,7 +3,6 @@ package com.sdm.common.feign.impl.pbs;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -22,7 +21,7 @@ public class TaskClientFeignClientImpl implements ITaskFeignClient {
response = taskFeignClient.adapterSubmitHpcJob(req);
return response;
} catch (Exception e) {
CoreLogger.error("SubmitHpcJob Exception:{}", e.getMessage());
log.error("SubmitHpcJob Exception:{}", e.getMessage());
return SdmResponse.failed("Hpc任务提交失败:"+e.getMessage());
}

View File

@@ -156,7 +156,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: data-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -156,7 +156,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: data-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -165,7 +165,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: data-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -155,7 +155,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: data-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -152,7 +152,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: data-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -16,6 +16,7 @@ import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.MdcUtil;
import com.sdm.flowable.entity.ProcessNodeParam;
import com.sdm.flowable.enums.AsyncTaskStatusEnum;
import com.sdm.flowable.service.IAsyncTaskRecordService;
@@ -58,30 +59,36 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
* */
@Override
public void execute(DelegateExecution execution, Map<String, Object> params, HPCExecuteConfig config) {
CoreLogger.info("hpc process excute,params:{},config:{}",JSONObject.toJSONString(params),JSONObject.toJSONString(config));
MdcUtil.generateAndPutTraceId();
log.info("hpc process excute,params:{},config:{}",JSONObject.toJSONString(params),JSONObject.toJSONString(config));
SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq = convertParamsToReq(params);
submitHpcTaskRemoteReq.setParams(params);
// 设置软件id
submitHpcTaskRemoteReq.setSoftwareId(config.getUuid());
String beforeNodeId = config.getBeforeNodeId();
String currentNodeId =execution.getCurrentActivityId();
String masterFileRegularStr = config.getInputFormat();
String inputFilesRegularStr = config.getSlaveFormat();
CoreLogger.info("hpc executeMode:{}",params.get("executeMode"));
String masterFileRegularStr = Objects.isNull(params.get("inputFormat"))?"":params.get("inputFormat").toString();
String inputFilesRegularStr = Objects.isNull(params.get("slaveFormat"))?"":params.get("slaveFormat").toString();
log.info("hpc executeMode:{}",params.get("executeMode"));
String executeMode = params.get("executeMode").toString();
if(StringUtils.isBlank(executeMode)||
(!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&&
!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL))){
MdcUtil.removeTraceId();
throw new RuntimeException("hpc executeMode illegal");
}
// 自动才判断正则
if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&&
StringUtils.isBlank(masterFileRegularStr)){
log.info("hpc process excute,主求解文件规则是空,masterFileRegularStr:{},inputFilesRegularStr:{}",
params.get("inputFormat"),params.get("slaveFormat"));
MdcUtil.removeTraceId();
throw new RuntimeException("Hpc任务执行失败主求解文件规则不能是空");
}
CoreLogger.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr);
CoreLogger.info("Hpc执行 processDefinitionId:{},processInstanceId:{}",execution.getProcessDefinitionId(),execution.getProcessInstanceId());
log.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr);
log.info("Hpc执行 processDefinitionId:{},processInstanceId:{}",execution.getProcessDefinitionId(),execution.getProcessInstanceId());
// 初始化用户/租户信息
initUserInfo(execution,params);
// params 取只是测试使用
@@ -106,6 +113,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
// 推送失败消息
sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"失败");
log.error("HpcHandler submit failed:{}",JSONObject.toJSONString(params));
MdcUtil.removeTraceId();
throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage());
}
@@ -122,7 +130,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
status,
hpcTaskId
);
MdcUtil.removeTraceId();
log.info("HPC 任务 {} 已提交", "hpcTaskId");
}
@@ -151,7 +159,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
ThreadLocalContext.setUserId(userId);
ThreadLocalContext.setUserName(userName);
ThreadLocalContext.setTenantId(tenantId);
CoreLogger.info("hpcHander initUserInfo userId:{},tenantId:{},userName:{}",userId,tenantId,userName);
log.info("hpcHander initUserInfo userId:{},tenantId:{},userName:{}",userId,tenantId,userName);
}
private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId,
@@ -172,11 +180,15 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
List<ProcessNodeParam> currentNodeParams = nodeParamMap.get(currentNodeId);
if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)){
if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){
MdcUtil.removeTraceId();
log.warn("未获取到当前节点或者求解文件节点信息");
throw new RuntimeException("未获取到当前节点或者求解文件节点信息");
}
}
if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)){
if(CollectionUtils.isEmpty(currentNodeParams)){
MdcUtil.removeTraceId();
log.warn("未获取到当前节点信息");
throw new RuntimeException("未获取到当前节点信息");
}
}
@@ -190,14 +202,15 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
// 本地求解文件路径 taskLocalBaseDir
String simulationFilePath = simulationBaseDir + beforeNodeJectKey;
submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationFilePath);
CoreLogger.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey);
log.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey);
}
// 手动上传的
if (Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)) {
List<String> masterFilePaths = getFileListFromMap(params, "masterFileRegularStr");
List<String> inPutFilePaths = getFileListFromMap(params, "inputFilesRegularStr");
if(CollectionUtils.isEmpty(masterFilePaths)){
CoreLogger.warn("hpc executeMode manual,filepath illegal");
log.warn("hpc executeMode manual,filepath illegal");
MdcUtil.removeTraceId();
throw new RuntimeException("手动模式求解文件不能为空");
}
submitHpcTaskRemoteReq.setManualMasterFilepaths(masterFilePaths);
@@ -214,7 +227,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
submitHpcTaskRemoteReq.setStdoutSpdmMinoFilePath(currentNodeJectKey);
// hpc 回传本地文件路径
submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey);
CoreLogger.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey);
log.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey);
}
/**
@@ -257,7 +270,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
* 将参数Map转换为SubmitHpcTaskRemoteReq对象的工具方法 String command
*/
private SubmitHpcTaskRemoteReq convertParamsToReq(Map<String, Object> params) {
CoreLogger.error("convertParamsToReq start ");
log.info("convertParamsToReq start ");
SubmitHpcTaskRemoteReq req = new SubmitHpcTaskRemoteReq();
if (params == null) {
return req;
@@ -267,55 +280,23 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
// 处理int类型字段包含空值和非数字的异常处理
try {
req.setCoreNum(params.get("coreNum") != null ? Integer.parseInt(params.get("coreNum").toString()) : 0);
} catch (NumberFormatException e) {
CoreLogger.error("coreNum parse error:{},coreNum:{}",e.getMessage(),params.get("coreNum"));
} catch (Exception e) {
log.error("coreNum parse error:{},coreNum:{}",e.getMessage(),params.get("coreNum"));
req.setCoreNum(0);
}
req.setSoftware(params.get("software").toString());
req.setJobType(params.get("jobType").toString());
try {
req.setIndependence(params.get("independence") != null ? Integer.parseInt(params.get("independence").toString()) : 0);
} catch (NumberFormatException e) {
} catch (Exception e) {
log.warn("get independence error:{}",e.getMessage());
req.setIndependence(0);
}
req.setTaskId(params.get("taskId").toString());
req.setTaskName(params.get("taskName").toString());
req.setRunId(params.get("runId").toString());
req.setRunName(params.get("runName").toString());
// mock 时暂时自己传递,后面根据软件名称查询命令 todo 后面从表配置查询
// String command =(params.get("command")==null||StringUtils.isBlank(params.get("command").toString()))?
// "\\\\CARSAFE\\share\\solver\\RLithium\\reta.exe -i %s" : params.get("command").toString();
// 只是测试环境用于兜底mock
// if(StringUtils.isBlank(command)){
// command = mockCommand;
// }
// if(StringUtils.isBlank(command)){
// CoreLogger.error("command is empty!!!!!");
// throw new RuntimeException("command is empty");
// }
// req.setCommand(command);
req.setProjectname(params.get("projectname").toString());
// req.setFeatchFileType(params.get("featchFileType").toString());
// req.setBeforeNodeId(params.get("beforeNodeId").toString());
// 处理commandExpand字段JSON字符串转Map
// 动态命令
// String commandExpandJson = params.get("commandExpand").toString();
// if (StringUtils.isNotBlank(commandExpandJson)) {
// try {
// // 将JSON字符串转换为Map<String, SimulationCommandPlaceholderReq>
// Map<String, SimulationCommandPlaceholderReq> commandExpand = objectMapper.readValue(
// commandExpandJson,
// new TypeReference<Map<String, SimulationCommandPlaceholderReq>>() {}
// );
//// req.setCommandExpand(commandExpand);
// } catch (Exception e) {
// CoreLogger.error("convertParamsToReq error:{},params:{}",e.getMessage(), JSONObject.toJSONString(params));
// // 如设为null或空Map
//// req.setCommandExpand(new HashMap<>());
// }
// }
return req;
}

View File

@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr([%X{traceId}] %d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr([%15.15t]){faint} %clr(%logger){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}" />
<!-- 普通日志格式(无颜色) -->
<property name="FILE_LOG_PATTERN" value="[%X{traceId}] %d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${PID:- } [%15.15t] %logger : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />
<!-- 日志文件存储地址 -->
<property name="LOG_HOME" value="/home/app/flowable/logs" />
<!-- 1. 控制台输出默认输出DEBUG及以上级别 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<!-- 2. INFO级别日志输出到running.log仅包含INFO及以上不包含DEBUG -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/running.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/running.log.%d{yyyy-MM-dd}.%i.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
<TotalSizeCap>500MB</TotalSizeCap>
<maxFileSize>10MB</maxFileSize>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${FILE_LOG_PATTERN}</pattern>
</encoder>
<!-- 过滤只接受INFO及以上级别INFO, WARN, ERROR排除DEBUG -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<!-- 3. DEBUG级别日志输出到running_debug.log仅包含DEBUG级别 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/running_debug.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/running_debug.log.%d{yyyy-MM-dd}.%i.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
<TotalSizeCap>500MB</TotalSizeCap>
<maxFileSize>10MB</maxFileSize>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${FILE_LOG_PATTERN}</pattern>
</encoder>
<!-- 过滤只接受DEBUG级别 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch> <!-- 匹配DEBUG级别则接受 -->
<onMismatch>DENY</onMismatch> <!-- 不匹配则拒绝 -->
</filter>
</appender>
<!-- 4. core.log 专用输出器(保留 callerInfo 格式) -->
<appender name="CORE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/core.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/core.log.%d{yyyy-MM-dd}.%i.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
<TotalSizeCap>500MB</TotalSizeCap>
<maxFileSize>10MB</maxFileSize>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!-- 仅 core.log 显示真实调用位置(类名.方法名(行号) -->
<pattern>[%X{traceId}] %d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${PID:- } [%15.15t] %X{callerInfo} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<!-- 全局日志级别设置为DEBUG确保DEBUG日志能被捕获 -->
<root level="INFO">
<appender-ref ref="STDOUT" /> <!-- 控制台输出DEBUG及以上 -->
<appender-ref ref="INFO_FILE" /> <!-- INFO及以上输出到running.log -->
<appender-ref ref="DEBUG_FILE" /> <!-- 仅DEBUG输出到running_debug.log -->
</root>
<!-- 绑定 FeignClient → 输出到日志文件 -->
<logger name="FeignClient" level="INFO" additivity="false">
<appender-ref ref="INFO_FILE" />
<appender-ref ref="STDOUT" />
</logger>
<!-- 绑定 coreLogger → 输出到 core.log + 控制台 -->
<logger name="coreLogger" level="INFO" additivity="false">
<appender-ref ref="CORE_FILE" /> <!-- 核心日志写入 core.log -->
<appender-ref ref="STDOUT" /> <!-- 同时输出到控制台(显示 CoreLogger -->
</logger>
<!-- MyBatis SQL语句输出配置 -->
<logger name="org.apache.ibatis" level="DEBUG"/>
<logger name="org.apache.ibatis.session.AutoMappingUnknownColumnBehavior" level="ERROR"/>
<!-- MyBatis SQL语句详细输出 -->
<logger name="org.apache.ibatis.logging" level="DEBUG"/>
<logger name="org.apache.ibatis.logging.jdbc" level="DEBUG"/>
<logger name="org.apache.ibatis.logging.jdbc.BaseJdbcLogger" level="DEBUG"/>
<logger name="org.apache.ibatis.datasource" level="DEBUG"/>
<logger name="org.apache.ibatis.transaction" level="DEBUG"/>
<logger name="org.apache.ibatis.cache" level="DEBUG"/>
<logger name="org.mybatis" level="DEBUG"/>
<logger name="org.mybatis.spring" level="DEBUG"/>
<logger name="org.mybatis.spring.SqlSessionUtils" level="DEBUG"/>
<logger name="org.mybatis.spring.transaction" level="DEBUG"/>
<!-- MyBatis-Plus 相关日志配置 -->
<logger name="com.baomidou.mybatisplus" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.core" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.core.MybatisConfiguration" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.core.MybatisMapperRegistry" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.core.override.MybatisMapperProxy" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.extension" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.extension.spring" level="DEBUG"/>
<logger name="com.baomidou.mybatisplus.extension.MybatisPlusProperties" level="DEBUG"/>
<logger name="java.sql" level="DEBUG"/>
<logger name="java.sql.Connection" level="DEBUG"/>
<logger name="java.sql.Statement" level="DEBUG"/>
<logger name="java.sql.PreparedStatement" level="DEBUG"/>
<logger name="java.sql.ResultSet" level="DEBUG"/>
<logger name="com.sdm.flowable" level="INFO"/>
<logger name="com.sdm.flowable.dao" level="DEBUG"/>
<logger name="com.sdm.flowable.dao" level="DEBUG"/>
</configuration>

View File

@@ -143,7 +143,7 @@ public class SimulationJob implements Serializable {
@TableField("uuid")
private String uuid;
@Schema(description = "任务结果回传状态generating,uploading,finished")
@Schema(description = "任务结果回传状态generating,uploading,finished,failed")
@TableField("fileStatus")
private String fileStatus;
@@ -174,6 +174,7 @@ public class SimulationJob implements Serializable {
private Long dirId;
@Schema(description = "任务耗时,前端展示字段")
@TableField(value = "costTime",exist = false)
private String costTime;
}

View File

@@ -67,6 +67,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
@Autowired
private IFlowableFeignClient flowableFeignClient;
@Value("#{'${hpc.jobs.ascOrders:Queued,Running,Failed,Canceled,Finished }'.split(',')}")
private List<String> ascOrders;
// 正则匹配%后的单词(\w+ 匹配字母、数字、下划线)
private static final Pattern PLACEHOLDER_PATTERN = Pattern.compile("%(\\w+)");
@@ -570,6 +573,12 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
if (req.getRunId() != null && !req.getRunId().trim().isEmpty()) {
// like实现 包含关键词 的模糊查询(%关键词%
queryChain.like(SimulationJob::getRunId, req.getRunId().trim());
}
// 排序
if (CollectionUtils.isNotEmpty(ascOrders)) {
String statusValues = String.join("','", ascOrders);
String orderBySql = String.format("ORDER BY FIELD(jobStatus, '%s') ASC, createTime DESC", statusValues);
queryChain.getWrapper().last(orderBySql);
}
List<SimulationJob> results = queryChain.list();
// 时间转换

View File

@@ -168,9 +168,9 @@ xxl:
# 执行器应用名称
appname: pbs-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
ip: ${XXL_JOB_EXECUTOR_IP:}
# 执行器端口,为了好记web服务端口+1000
port: 8105
# 执行器日志路径

View File

@@ -164,7 +164,7 @@ xxl:
# 执行器应用名称
appname: pbs-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -112,7 +112,7 @@ xxl:
# 执行器应用名称
appname: pbs-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -175,7 +175,7 @@ xxl:
# 执行器应用名称
appname: pbs-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -37,7 +37,7 @@ public class SimulationLyricNodeController {
@GetMapping("/getTodoList")
@Operation(summary = "获取待办列表", description = "获取待办列表")
public SdmResponse getTodoList() {
return lyricInternalService.getTodoList();
return lyricInternalService.getTodoList("http");
}
@GetMapping("/getProjectList")

View File

@@ -36,7 +36,7 @@ public class LyricTodoListSchedule {
ThreadLocalContext.setTenantId(paramJson.getLong("tenantId"));
ThreadLocalContext.setUserId(paramJson.getLong("userId"));
long startTime = System.currentTimeMillis();
SdmResponse response = lyricInternalService.getTodoList();
SdmResponse response = lyricInternalService.getTodoList("schedule");
long endTime = System.currentTimeMillis();
long second = (endTime - startTime) / 1000;
log.info("{} lyricTodoListHandler cost [{}] s", traceId,second);

View File

@@ -13,7 +13,7 @@ public interface ILyricInternalService {
SdmResponse pushReport(PushReportReq req);
SdmResponse getTodoList();
SdmResponse getTodoList(String from);
SdmResponse getTodoListByProjectNum(String projectNum);

View File

@@ -14,6 +14,7 @@ import com.sdm.common.entity.resp.system.CIDUserResp;
import com.sdm.common.feign.impl.data.DataClientFeignClientImpl;
import com.sdm.common.feign.impl.system.SysUserFeignClientImpl;
import com.sdm.common.utils.FilesUtil;
import com.sdm.common.utils.MdcUtil;
import com.sdm.common.utils.PageUtils;
import com.sdm.common.utils.RandomUtil;
import com.sdm.outbridge.entity.*;
@@ -45,6 +46,7 @@ import org.mybatis.spring.MyBatisSystemException;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
@@ -57,8 +59,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -82,6 +84,8 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
private static final long SYNC_INTERVAL_MS = 30 * 1000L;
private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static final int UUID_LENGTH = 32;
// 同步待办锁
private final ReentrantLock syncTodoInfoLock = new ReentrantLock();
@Autowired
private LyricIntegrateService lyricIntegrateService;
@@ -135,6 +139,9 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
@Autowired
private LyricVProjectResourcePlanDMService lyricVProjectResourcePlanDMService;
@Value("${lyric.syncTodoDataWaitSeconds:3}")
private Long syncTodoDataWaitSeconds;
/**
* 判断字符串是否可以安全转换为Long类型
*
@@ -285,67 +292,83 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
* 核心同步逻辑:构建数据并执行批量操作
*/
private SdmResponse syncTodoData(List<SimulationNode> projectNodeList,
List<LyricVTodoEmulationInfoDM> todoInfoList) {
Long tenantId = ThreadLocalContext.getTenantId();
Long jobNumber = ThreadLocalContext.getUserId();
String curDateStr = new SimpleDateFormat(DATE_FORMAT_PATTERN).format(new Date());
List<LyricVTodoEmulationInfoDM> todoInfoList,String traceId) {
if(StringUtils.isNotBlank(traceId)){
MdcUtil.putTraceId(traceId);
}
// 尝试获取锁(立即返回,不等待)
if (!syncTodoInfoLock.tryLock()) {
return SdmResponse.success("有数据同步任务正在执行中,请稍后再试");
}
try {
Long tenantId = ThreadLocalContext.getTenantId();
Long jobNumber = ThreadLocalContext.getUserId();
String curDateStr = new SimpleDateFormat(DATE_FORMAT_PATTERN).format(new Date());
// 构建批量创建文件夹参数 + 权限更新参数
List<BatchCreateDirItem> createDirItemList = new ArrayList<>();
List<UpdatePermissionReq> updatePermissionList = new ArrayList<>();
List<SimulationDemand> demandToCreateTaskList = new ArrayList<>();
// 构建批量创建文件夹参数 + 权限更新参数
List<BatchCreateDirItem> createDirItemList = new ArrayList<>();
List<UpdatePermissionReq> updatePermissionList = new ArrayList<>();
List<SimulationDemand> demandToCreateTaskList = new ArrayList<>();
// 按项目分组处理待办
Map<String, List<LyricVTodoEmulationInfoDM>> projectTodoMap = todoInfoList.stream()
.collect(Collectors.groupingBy(LyricVTodoEmulationInfoDM::getProject));
// 按项目分组处理待办
Map<String, List<LyricVTodoEmulationInfoDM>> projectTodoMap = todoInfoList.stream()
.collect(Collectors.groupingBy(LyricVTodoEmulationInfoDM::getProject));
for (SimulationNode projectNode : projectNodeList) {
String projectCode = projectNode.getNodeCode();
List<LyricVTodoEmulationInfoDM> projectTodoList = projectTodoMap.getOrDefault(projectCode, Collections.emptyList());
if (CollectionUtils.isEmpty(projectTodoList)) {
continue;
}
// 构建项目级文件夹参数
BatchCreateDirItem projectCreateItem = buildProjectDirItem(projectNode);
List<DirNodeInfo> demandDirNodeList = new ArrayList<>();
// 处理当前项目下的所有待办
for (LyricVTodoEmulationInfoDM todo : projectTodoList) {
try {
String demandUuid = RandomUtil.generateString(UUID_LENGTH);
// 1. 构建需求基础参数 + 成员 + 权限
SpdmAddDemandReq demandReq = buildDemandReq(todo, demandUuid, curDateStr);
List<SpdmDemandRelateMemberReq> memberList = buildDemandMemberList(todo, demandUuid, jobNumber, curDateStr);
// 2. 构建权限更新参数
buildPermissionReqList(demandUuid, jobNumber, todo, updatePermissionList);
// 3. 构建需求文件夹节点
demandDirNodeList.add(buildDemandDirNode(demandUuid, projectNode, demandReq.getDemandName()));
// 4. 异步保存需求数据
asyncSaveDemandData(demandReq, memberList, tenantId, jobNumber);
SimulationDemand demand = new SimulationDemand();
BeanUtils.copyProperties(demandReq, demand);
demandToCreateTaskList.add(demand);
} catch (Exception e) {
log.error("处理项目[{}]下待办[{}]异常: ", projectCode, todo.getTodoId(), e);
for (SimulationNode projectNode : projectNodeList) {
String projectCode = projectNode.getNodeCode();
List<LyricVTodoEmulationInfoDM> projectTodoList = projectTodoMap.getOrDefault(projectCode, Collections.emptyList());
if (CollectionUtils.isEmpty(projectTodoList)) {
continue;
}
// 构建项目级文件夹参数
BatchCreateDirItem projectCreateItem = buildProjectDirItem(projectNode);
List<DirNodeInfo> demandDirNodeList = new ArrayList<>();
// 处理当前项目下的所有待办
for (LyricVTodoEmulationInfoDM todo : projectTodoList) {
try {
String demandUuid = RandomUtil.generateString(UUID_LENGTH);
// 1. 构建需求基础参数 + 成员 + 权限
SpdmAddDemandReq demandReq = buildDemandReq(todo, demandUuid, curDateStr);
List<SpdmDemandRelateMemberReq> memberList = buildDemandMemberList(todo, demandUuid, jobNumber, curDateStr);
// 2. 构建权限更新参数
buildPermissionReqList(demandUuid, jobNumber, todo, updatePermissionList);
// 3. 构建需求文件夹节点
demandDirNodeList.add(buildDemandDirNode(demandUuid, projectNode, demandReq.getDemandName()));
// 4. 异步保存需求数据
asyncSaveDemandData(demandReq, memberList, tenantId, jobNumber);
SimulationDemand demand = new SimulationDemand();
BeanUtils.copyProperties(demandReq, demand);
demandToCreateTaskList.add(demand);
} catch (Exception e) {
log.error("处理项目[{}]下待办[{}]异常: ", projectCode, todo.getTodoId(), e);
}
}
// 绑定需求文件夹到项目
projectCreateItem.setChildDirNodeInfos(demandDirNodeList);
createDirItemList.add(projectCreateItem);
}
// 绑定需求文件夹到项目
projectCreateItem.setChildDirNodeInfos(demandDirNodeList);
createDirItemList.add(projectCreateItem);
// 执行批量操作
executeBatchOperations(createDirItemList, updatePermissionList);
if(CollectionUtils.isNotEmpty(demandToCreateTaskList)){
simulationTaskService.batchCreateTaskFromDemand(demandToCreateTaskList);
}
return SdmResponse.success();
} catch (Exception e) {
log.error("syncTodoData 未知 error: {}", e.getMessage());
return SdmResponse.failed();
}finally {
syncTodoInfoLock.unlock();
if(StringUtils.isNotBlank(traceId)){
MdcUtil.removeTraceId();
}
}
// 执行批量操作
executeBatchOperations(createDirItemList, updatePermissionList);
if(CollectionUtils.isNotEmpty(demandToCreateTaskList)){
simulationTaskService.batchCreateTaskFromDemand(demandToCreateTaskList);
}
return SdmResponse.success();
}
/**
@@ -600,9 +623,10 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
}
// form :http 前端请求过来的schedule 定时任务过来的
@Override
public SdmResponse getTodoList() {
public SdmResponse getTodoList(String from) {
// 1. 缓存校验
SdmResponse cacheCheckResponse = checkSyncCache();
if (cacheCheckResponse != null) {
@@ -627,7 +651,9 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
}
// 4. 构建同步数据并执行批量操作
return syncTodoData(projectNodeList, todoInfoList);
return (Objects.equals("http",from)) ? httpSyncTodoData(projectNodeList, todoInfoList):
syncTodoData(projectNodeList, todoInfoList,"");
}
@Override
@@ -1678,4 +1704,27 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
return PageUtils.getJsonObjectSdmResponse(projectMemberList, page);
}
private SdmResponse httpSyncTodoData( List<SimulationNode> projectNodeList,List<LyricVTodoEmulationInfoDM> todoInfoList){
String traceId = MdcUtil.getTraceId();
CompletableFuture<SdmResponse> syncTodoDataFeature = CompletableFuture.supplyAsync(() ->
syncTodoData(projectNodeList, todoInfoList,traceId));
try {
SdmResponse sdmResponse = syncTodoDataFeature.get(syncTodoDataWaitSeconds, TimeUnit.SECONDS);
return sdmResponse;
} catch (InterruptedException e) {
log.error("同步异常终止:{}",e.getMessage());
return SdmResponse.failed("同步异常终止");
} catch (ExecutionException e) {
log.error("同步执行错误:{}",e.getMessage());
return SdmResponse.failed("同步执行出错误");
} catch (TimeoutException e) {
log.warn("同步执行超时:{}",e.getMessage());
return SdmResponse.success("同步执行中,请稍后");
}catch (Exception e) {
log.error("同步未知错误:{}",e.getMessage());
return SdmResponse.failed("同步未知错误");
}
}
}

View File

@@ -168,7 +168,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: project-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -168,7 +168,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: project-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -172,7 +172,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: project-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000

View File

@@ -169,7 +169,7 @@ xxl:
# 执行器应用名称 服务名-job-executor
appname: project-job-executor
# 执行器注册地址默认使用address注册若为null则使用ip:port注册
address:
address: ${XXL_JOB_EXECUTOR_IP:}
# 执行器IP
ip:
# 执行器端口,为了好记web服务端口+1000