diff --git a/common/src/main/java/com/sdm/common/utils/DateUtils.java b/common/src/main/java/com/sdm/common/utils/DateUtils.java index 2d21ed67..7b11ad57 100644 --- a/common/src/main/java/com/sdm/common/utils/DateUtils.java +++ b/common/src/main/java/com/sdm/common/utils/DateUtils.java @@ -1,16 +1,23 @@ package com.sdm.common.utils; +import lombok.extern.slf4j.Slf4j; + import java.text.ParseException; import java.text.SimpleDateFormat; -import java.time.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAdjusters; import java.util.Calendar; import java.util.Date; +import java.util.concurrent.TimeUnit; /** * 日期时间工具类 */ +@Slf4j public class DateUtils { // 常用日期时间格式 @@ -22,6 +29,10 @@ public class DateUtils { public static final String PATTERN_COMPACT = "yyyyMMddHHmmss"; public static final String PATTERN_COMPACT_DATE = "yyyyMMdd"; + // 时间格式 + private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm:ss"; + private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat(DEFAULT_DATE_FORMAT); + private DateUtils() { throw new IllegalStateException("Utility class"); } @@ -229,4 +240,79 @@ public class DateUtils { return toDate(localDate); } + /** + * 计算两个时间字符串的耗时(单位:毫秒) + * @param startTimeStr 开始时间字符串(格式:yyyy/MM/dd HH:mm:ss) + * @param endTimeStr 结束时间字符串(格式:yyyy/MM/dd HH:mm:ss) + * @return 耗时(毫秒),时间无效返回null + */ + public static Long calculateTimeConsume(String startTimeStr, String endTimeStr) { + // 空值校验 + if (startTimeStr == null || startTimeStr.trim().isEmpty() || + endTimeStr == null || endTimeStr.trim().isEmpty()) { + return null; + } + + try { + // 解析时间字符串 + Date startTime = DATE_FORMATTER.parse(startTimeStr.trim()); + Date endTime = DATE_FORMATTER.parse(endTimeStr.trim()); + + // 计算耗时(确保结束时间不早于开始时间) + long consumeMillis = endTime.getTime() - startTime.getTime(); + return consumeMillis >= 0 ? consumeMillis : null; + + } catch (Exception e) { + // 时间格式解析失败 + log.warn("时间格式解析失败,startTime: {}, endTime: {}, 格式要求:{}", + startTimeStr, endTimeStr, DEFAULT_DATE_FORMAT); + return null; + } + } + + /** + * 计算耗时并转换为指定时间单位 + * @param startTimeStr 开始时间字符串 + * @param endTimeStr 结束时间字符串 + * @param timeUnit 目标时间单位 + * @return 耗时(指定单位),时间无效返回null + */ + public static Long calculateTimeConsume(String startTimeStr, String endTimeStr, TimeUnit timeUnit) { + Long consumeMillis = calculateTimeConsume(startTimeStr, endTimeStr); + if (consumeMillis == null) { + return null; + } + return timeUnit.convert(consumeMillis, TimeUnit.MILLISECONDS); + } + + /** + * 获取格式化的耗时字符串(xx小时xx分xx秒) + * @param startTimeStr 开始时间字符串 + * @param endTimeStr 结束时间字符串 + * @return 格式化耗时字符串,时间无效返回null + */ + public static String getFormattedConsumeTime(String startTimeStr, String endTimeStr) { + Long consumeMillis = calculateTimeConsume(startTimeStr, endTimeStr); + if (consumeMillis == null) { + return null; + } + + long hours = TimeUnit.MILLISECONDS.toHours(consumeMillis); + long minutes = TimeUnit.MILLISECONDS.toMinutes(consumeMillis) % 60; + long seconds = TimeUnit.MILLISECONDS.toSeconds(consumeMillis) % 60; + long millis = consumeMillis % 1000; + + if (hours > 0) { + return String.format("%d小时%d分%d秒%d毫秒", hours, minutes, seconds, millis); + } else if (minutes > 0) { + return String.format("%d分%d秒%d毫秒", minutes, seconds, millis); + } else if (seconds > 0) { + return String.format("%d秒%d毫秒", seconds, millis); + } else { + return String.format("%d毫秒", millis); + } + } + + + } \ No newline at end of file diff --git a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java index acb71ecb..405c9956 100644 --- a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java +++ b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java @@ -2,6 +2,7 @@ package com.sdm.common.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; +import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.common.log.CoreLogger; import lombok.extern.slf4j.Slf4j; @@ -43,6 +44,9 @@ public class HpcCommandExcuteUtil { @Value("${hpc.remoteDownLoadFileUrl:}") private String remoteDownLoadFileUrl; + @Value("${hpc.callHpcUpload:}") + private String callHpcUpload; + @Autowired private HttpClientUtil httpClientUtil; @@ -159,6 +163,27 @@ public class HpcCommandExcuteUtil { return builder.body(body); } + public SdmResponse callHpcUploadToTarget(String jobId, String workDir) { + com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject(); + paramJson.put("jobId", jobId); + paramJson.put("jobWorkDir", workDir); + Boolean call = false; + String resultString = ""; + try { + resultString = httpClientUtil.doPostJson(callHpcUpload, paramJson.toJSONString()); + CoreLogger.info("callHpcUploadToTarget back:{}", resultString); + call = JSON.parseObject( + resultString, + new TypeReference() { + } + ); + return SdmResponse.success(call); + } catch (Exception e) { + CoreLogger.error("callHpcUploadToTarget error,jobId:{},workDir:{},errMsg:{}", jobId,workDir, e.getMessage()); + return SdmResponse.failed(call); + } + } + private String extractFileName(String path) { if (path == null || path.isBlank()) { return "unknown"; @@ -171,4 +196,5 @@ public class HpcCommandExcuteUtil { } + } diff --git a/common/src/main/java/com/sdm/common/utils/String2NumberUtil.java b/common/src/main/java/com/sdm/common/utils/String2NumberUtil.java new file mode 100644 index 00000000..4c5d73a6 --- /dev/null +++ b/common/src/main/java/com/sdm/common/utils/String2NumberUtil.java @@ -0,0 +1,25 @@ +package com.sdm.common.utils; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class String2NumberUtil { + + /** + * 将字符串转换为Long,转换失败返回null + * @param str 待转换的字符串 + * @return 转换后的Long值,失败返回null + */ + public static Long stringToLong(String str) { + if (str == null || str.trim().isEmpty()) { + return null; + } + try { + return Long.parseLong(str.trim()); + } catch (NumberFormatException e) { + log.warn("字符串转换Long失败,输入值:{}", str, e); + return null; + } + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java index 6dd2b595..62094762 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java @@ -223,5 +223,11 @@ public class TaskController implements ITaskFeignClient { return hpcInstructionService.hpcDownloadFiles(downloadFilesReq); } + // 通知文件回传 mock 测试使用 + @PostMapping("/callHpcUploadToTarget") + public SdmResponse hpcDownloadFiles(@RequestBody Map paramMap) { + return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString()); + } + } diff --git a/pbs/src/main/java/com/sdm/pbs/model/bo/HpcJobStatusInfo.java b/pbs/src/main/java/com/sdm/pbs/model/bo/HpcJobStatusInfo.java index 7ddb7d4d..4f1a8ed0 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/bo/HpcJobStatusInfo.java +++ b/pbs/src/main/java/com/sdm/pbs/model/bo/HpcJobStatusInfo.java @@ -33,4 +33,14 @@ public class HpcJobStatusInfo { @Schema(description = "作业计算软件") public String software; + + @Schema(description = "作业计算节点") + private String allocatedNodes; + + @Schema(description = "总内核态时间(单位:毫秒)") + private String totalKernelTime; + + @Schema(description = "总用户态时间(单位:毫秒)") + private String totalUserTime; + } diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/HpcJobStatusSchedule.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/HpcJobStatusSchedule.java new file mode 100644 index 00000000..4a6c9d88 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/HpcJobStatusSchedule.java @@ -0,0 +1,58 @@ +package com.sdm.pbs.schedule.hpc; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +// 后期多节点部署,优化成分布式定时任务 +@Component +public class HpcJobStatusSchedule { + + @Value("${task.schedule.interval:120}") + private long interval; + + @Value("${task.schedule.poolSize:1}") + private int poolSize; + + private ScheduledExecutorService scheduler; + + @Autowired + private HpcJobStatusScheduleExcutor hpcJobStatusScheduleService; + + /** + * 初始化定时任务:直接提交Service作为任务 + */ + @PostConstruct + public void initScheduledTask() { + scheduler = Executors.newScheduledThreadPool(poolSize); + scheduler.scheduleWithFixedDelay( + hpcJobStatusScheduleService, + 0, + interval, + TimeUnit.SECONDS + ); + } + + /** + * 销毁线程池 + */ + @PreDestroy + public void destroy() { + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(1, TimeUnit.MINUTES)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } + } + } +} diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/HpcJobStatusScheduleExcutor.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/HpcJobStatusScheduleExcutor.java new file mode 100644 index 00000000..2312a5af --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/HpcJobStatusScheduleExcutor.java @@ -0,0 +1,60 @@ +package com.sdm.pbs.schedule.hpc; + +import com.sdm.common.common.SdmResponse; +import com.sdm.pbs.model.bo.HpcJobStatusInfo; +import com.sdm.pbs.model.entity.SimulationJob; +import com.sdm.pbs.schedule.hpc.hander.HpcJobStatusStrategyFactory; +import com.sdm.pbs.service.IPbsService; +import com.sdm.pbs.service.ISimulationJobService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Slf4j +@Service +public class HpcJobStatusScheduleExcutor implements Runnable{ + + @Autowired + private ISimulationJobService simulationJobService; + + @Autowired + @Qualifier("decoratorPbsService") + private IPbsService pbsService; + + @Autowired + private HpcJobStatusStrategyFactory strategyFactory; + + @Override + public void run() { + try { + // Configuring,Queued,Running,Canceled,Finished,Failed + // 查询 jobId非空, jobStatus非 Canceled,Failed 数据, Finished,文件可能传完,可能没有传递完 + List list = simulationJobService.lambdaQuery() + .isNotNull(SimulationJob::getJobId) + .notIn(SimulationJob::getJobStatus, "Canceled", "Failed") + // 回传文件状态 + .notIn(SimulationJob::getFileStatus, "generating", "uploading") + .list(); + if(CollectionUtils.isNotEmpty(list)){ + log.info("HpcJobStatus query db data null"); + return; + } + // 非Finished的数据直接修改状态,Finished修改状态+通知上传到minio + for(SimulationJob simJob : list){ + SdmResponse hpcResponse = pbsService.getJobStatus(simJob.getJobId()); + if(hpcResponse.isSuccess()&&hpcResponse.getData()!=null){ + HpcJobStatusInfo data = hpcResponse.getData(); + String jobStatus = data.getJobStatus(); + strategyFactory.getHandler(jobStatus).handle(simJob,data);; + } + } + } catch (Exception e) { + log.error("HpcJobStatus schedule error:{}",e.getMessage()); + } + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java new file mode 100644 index 00000000..94cccf36 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java @@ -0,0 +1,65 @@ +package com.sdm.pbs.schedule.hpc.hander; + +import com.alibaba.fastjson2.JSONObject; +import com.sdm.common.common.SdmResponse; +import com.sdm.common.log.CoreLogger; +import com.sdm.common.utils.DateUtils; +import com.sdm.common.utils.String2NumberUtil; +import com.sdm.pbs.model.bo.HpcJobStatusInfo; +import com.sdm.pbs.model.entity.SimulationJob; +import com.sdm.pbs.service.HpcInstructionService; +import com.sdm.pbs.service.ISimulationJobService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Component +public class FinishedStatusHandler implements JobStatusHandler { + + @Autowired + private ISimulationJobService simulationJobService; + + @Autowired + private HpcInstructionService hpcInstructionService; + + @Override + public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) { + try { + // 过程结束修改 + SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one(); + newDbJob.setFileStatus(statusInfo.getJobStatus()); + newDbJob.setStartTime(statusInfo.getStartTime()); + newDbJob.setEndTime(statusInfo.getEndTime()); + newDbJob.setNodeName(statusInfo.getAllocatedNodes()); + newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime())); + newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime())); + newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume( + statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS)); + newDbJob.setUpdateTime(LocalDateTime.now()); + // 通知工具回传文件 + SdmResponse callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath()); + if (!callResponse.isSuccess()||!callResponse.getData()) { + CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath()); + return; + } + // 通知成功修改状态 + newDbJob.setFileStatus("uploading"); + simulationJobService.updateById(newDbJob); + } catch (Exception e) { + CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(), + JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo)); + } + + } + + @Override + public List getSupportedStatus() { + // todo 抽取成枚举类 + return Collections.singletonList("Finished"); + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/HpcJobStatusStrategyFactory.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/HpcJobStatusStrategyFactory.java new file mode 100644 index 00000000..c583944c --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/HpcJobStatusStrategyFactory.java @@ -0,0 +1,44 @@ +package com.sdm.pbs.schedule.hpc.hander; + +import com.sdm.common.log.CoreLogger; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Component +public class HpcJobStatusStrategyFactory { + private final Map strategyMap = new ConcurrentHashMap<>(); + + /** + * 注入所有JobStatusHandler实现类并注册到工厂 + */ + @Autowired + public HpcJobStatusStrategyFactory(List handlers) { + for (JobStatusHandler handler : handlers) { + for (String status : handler.getSupportedStatus()) { + strategyMap.put(status, handler); + CoreLogger.info("Register strategy for status:{} with handler:{}", + status, handler.getClass().getSimpleName()); + } + } + } + + /** + * 根据状态获取对应的处理器 + */ + public JobStatusHandler getHandler(String status) { + JobStatusHandler handler = strategyMap.get(status); + if (handler == null) { + CoreLogger.error("No handler registered for status:{}", status); + // 返回修改状态的hander + return strategyMap.get("Running"); + } + return handler; + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/JobStatusHandler.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/JobStatusHandler.java new file mode 100644 index 00000000..c75c2244 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/JobStatusHandler.java @@ -0,0 +1,21 @@ +package com.sdm.pbs.schedule.hpc.hander; + +import com.sdm.pbs.model.bo.HpcJobStatusInfo; +import com.sdm.pbs.model.entity.SimulationJob; + +import java.util.List; + +public interface JobStatusHandler { + /** + * 处理作业状态 + * @param simJob 作业信息 + * @param statusInfo 状态信息 + */ + void handle(SimulationJob simJob,HpcJobStatusInfo statusInfo); + + /** + * 获取支持的状态类型 + * @return 状态列表 + */ + List getSupportedStatus(); +} diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/ProcessStatusHandler.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/ProcessStatusHandler.java new file mode 100644 index 00000000..3762657d --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/ProcessStatusHandler.java @@ -0,0 +1,47 @@ +package com.sdm.pbs.schedule.hpc.hander; + +import com.alibaba.fastjson2.JSONObject; +import com.sdm.common.log.CoreLogger; +import com.sdm.common.utils.String2NumberUtil; +import com.sdm.pbs.model.bo.HpcJobStatusInfo; +import com.sdm.pbs.model.entity.SimulationJob; +import com.sdm.pbs.service.ISimulationJobService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +@Component +public class ProcessStatusHandler implements JobStatusHandler{ + + @Autowired + private ISimulationJobService simulationJobService; + + @Override + public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) { + try { + // 过程中状态修改 + SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one(); + newDbJob.setFileStatus(statusInfo.getJobStatus()); + newDbJob.setStartTime(statusInfo.getStartTime()); + newDbJob.setEndTime(statusInfo.getEndTime()); + newDbJob.setNodeName(statusInfo.getAllocatedNodes()); + newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime())); + newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime())); + newDbJob.setUpdateTime(LocalDateTime.now()); + simulationJobService.updateById(newDbJob); + } catch (Exception e) { + CoreLogger.error("HpcJobStatus handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(), + JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo)); + } + } + + @Override + public List getSupportedStatus() { + // todo 枚举类 + return Arrays.asList("Configuring", "Queued", "Running", "Canceled", "Failed"); + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java index c1f14f77..39ca01e8 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java @@ -54,5 +54,7 @@ public interface HpcInstructionService { ResponseEntity hpcDownloadFile(String fileName,Long fileSize); + // 通知hpc回传文件 + SdmResponse callHpcUploadToTarget(String jobId,String workDir); } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java index 677f5bb4..aba194da 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java @@ -479,6 +479,11 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize); } + @Override + public SdmResponse callHpcUploadToTarget(String jobId,String workDir) { + return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir); + } + /** * 使用 MappedByteBuffer + 分块映射实现超大文件下载(高性能、低内存) */ diff --git a/pbs/src/main/resources/application-dev.yml b/pbs/src/main/resources/application-dev.yml index 5e114a18..90f871c8 100644 --- a/pbs/src/main/resources/application-dev.yml +++ b/pbs/src/main/resources/application-dev.yml @@ -106,6 +106,10 @@ hpc: excuteWay: remote remoteCmdUrl: http://192.168.65.55:9097/doProcess remoteCreateDirUrl: http://192.168.65.55:9097/createDir + remoteScanDirUrl: http://192.168.65.55:9097/scanDir + remoteDownLoadFileUrl: http://192.168.65.55:9097/hpcDownload + callHpcUpload: http://192.168.65.55:9097/addJobQueue + #logging: # config: ./config/logback.xml