新增:hpc任务状态轮询更新状态

This commit is contained in:
yangyang01000846
2025-12-01 15:02:20 +08:00
parent eadb722c7c
commit 9575cc362b
14 changed files with 460 additions and 1 deletions

View File

@@ -1,16 +1,23 @@
package com.sdm.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAdjusters;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 日期时间工具类
*/
@Slf4j
public class DateUtils {
// 常用日期时间格式
@@ -22,6 +29,10 @@ public class DateUtils {
public static final String PATTERN_COMPACT = "yyyyMMddHHmmss";
public static final String PATTERN_COMPACT_DATE = "yyyyMMdd";
// 时间格式
private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm:ss";
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
private DateUtils() {
throw new IllegalStateException("Utility class");
}
@@ -229,4 +240,79 @@ public class DateUtils {
return toDate(localDate);
}
/**
* 计算两个时间字符串的耗时(单位:毫秒)
* @param startTimeStr 开始时间字符串格式yyyy/MM/dd HH:mm:ss
* @param endTimeStr 结束时间字符串格式yyyy/MM/dd HH:mm:ss
* @return 耗时毫秒时间无效返回null
*/
public static Long calculateTimeConsume(String startTimeStr, String endTimeStr) {
// 空值校验
if (startTimeStr == null || startTimeStr.trim().isEmpty() ||
endTimeStr == null || endTimeStr.trim().isEmpty()) {
return null;
}
try {
// 解析时间字符串
Date startTime = DATE_FORMATTER.parse(startTimeStr.trim());
Date endTime = DATE_FORMATTER.parse(endTimeStr.trim());
// 计算耗时(确保结束时间不早于开始时间)
long consumeMillis = endTime.getTime() - startTime.getTime();
return consumeMillis >= 0 ? consumeMillis : null;
} catch (Exception e) {
// 时间格式解析失败
log.warn("时间格式解析失败startTime: {}, endTime: {}, 格式要求:{}",
startTimeStr, endTimeStr, DEFAULT_DATE_FORMAT);
return null;
}
}
/**
* 计算耗时并转换为指定时间单位
* @param startTimeStr 开始时间字符串
* @param endTimeStr 结束时间字符串
* @param timeUnit 目标时间单位
* @return 耗时指定单位时间无效返回null
*/
public static Long calculateTimeConsume(String startTimeStr, String endTimeStr, TimeUnit timeUnit) {
Long consumeMillis = calculateTimeConsume(startTimeStr, endTimeStr);
if (consumeMillis == null) {
return null;
}
return timeUnit.convert(consumeMillis, TimeUnit.MILLISECONDS);
}
/**
* 获取格式化的耗时字符串xx小时xx分xx秒
* @param startTimeStr 开始时间字符串
* @param endTimeStr 结束时间字符串
* @return 格式化耗时字符串时间无效返回null
*/
public static String getFormattedConsumeTime(String startTimeStr, String endTimeStr) {
Long consumeMillis = calculateTimeConsume(startTimeStr, endTimeStr);
if (consumeMillis == null) {
return null;
}
long hours = TimeUnit.MILLISECONDS.toHours(consumeMillis);
long minutes = TimeUnit.MILLISECONDS.toMinutes(consumeMillis) % 60;
long seconds = TimeUnit.MILLISECONDS.toSeconds(consumeMillis) % 60;
long millis = consumeMillis % 1000;
if (hours > 0) {
return String.format("%d小时%d分%d秒%d毫秒", hours, minutes, seconds, millis);
} else if (minutes > 0) {
return String.format("%d分%d秒%d毫秒", minutes, seconds, millis);
} else if (seconds > 0) {
return String.format("%d秒%d毫秒", seconds, millis);
} else {
return String.format("%d毫秒", millis);
}
}
}

View File

@@ -2,6 +2,7 @@ package com.sdm.common.utils;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
import com.sdm.common.log.CoreLogger;
import lombok.extern.slf4j.Slf4j;
@@ -43,6 +44,9 @@ public class HpcCommandExcuteUtil {
@Value("${hpc.remoteDownLoadFileUrl:}")
private String remoteDownLoadFileUrl;
@Value("${hpc.callHpcUpload:}")
private String callHpcUpload;
@Autowired
private HttpClientUtil httpClientUtil;
@@ -159,6 +163,27 @@ public class HpcCommandExcuteUtil {
return builder.body(body);
}
public SdmResponse<Boolean> callHpcUploadToTarget(String jobId, String workDir) {
com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject();
paramJson.put("jobId", jobId);
paramJson.put("jobWorkDir", workDir);
Boolean call = false;
String resultString = "";
try {
resultString = httpClientUtil.doPostJson(callHpcUpload, paramJson.toJSONString());
CoreLogger.info("callHpcUploadToTarget back:{}", resultString);
call = JSON.parseObject(
resultString,
new TypeReference<Boolean>() {
}
);
return SdmResponse.success(call);
} catch (Exception e) {
CoreLogger.error("callHpcUploadToTarget error,jobId:{},workDir:{},errMsg:{}", jobId,workDir, e.getMessage());
return SdmResponse.failed(call);
}
}
private String extractFileName(String path) {
if (path == null || path.isBlank()) {
return "unknown";
@@ -171,4 +196,5 @@ public class HpcCommandExcuteUtil {
}
}

View File

@@ -0,0 +1,25 @@
package com.sdm.common.utils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class String2NumberUtil {
/**
* 将字符串转换为Long转换失败返回null
* @param str 待转换的字符串
* @return 转换后的Long值失败返回null
*/
public static Long stringToLong(String str) {
if (str == null || str.trim().isEmpty()) {
return null;
}
try {
return Long.parseLong(str.trim());
} catch (NumberFormatException e) {
log.warn("字符串转换Long失败输入值{}", str, e);
return null;
}
}
}

View File

@@ -223,5 +223,11 @@ public class TaskController implements ITaskFeignClient {
return hpcInstructionService.hpcDownloadFiles(downloadFilesReq);
}
// 通知文件回传 mock 测试使用
@PostMapping("/callHpcUploadToTarget")
public SdmResponse<Boolean> hpcDownloadFiles(@RequestBody Map paramMap) {
return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString());
}
}

View File

@@ -33,4 +33,14 @@ public class HpcJobStatusInfo {
@Schema(description = "作业计算软件")
public String software;
@Schema(description = "作业计算节点")
private String allocatedNodes;
@Schema(description = "总内核态时间(单位:毫秒)")
private String totalKernelTime;
@Schema(description = "总用户态时间(单位:毫秒)")
private String totalUserTime;
}

View File

@@ -0,0 +1,58 @@
package com.sdm.pbs.schedule.hpc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
// 后期多节点部署,优化成分布式定时任务
@Component
public class HpcJobStatusSchedule {
@Value("${task.schedule.interval:120}")
private long interval;
@Value("${task.schedule.poolSize:1}")
private int poolSize;
private ScheduledExecutorService scheduler;
@Autowired
private HpcJobStatusScheduleExcutor hpcJobStatusScheduleService;
/**
* 初始化定时任务直接提交Service作为任务
*/
@PostConstruct
public void initScheduledTask() {
scheduler = Executors.newScheduledThreadPool(poolSize);
scheduler.scheduleWithFixedDelay(
hpcJobStatusScheduleService,
0,
interval,
TimeUnit.SECONDS
);
}
/**
* 销毁线程池
*/
@PreDestroy
public void destroy() {
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
}
}
}

View File

@@ -0,0 +1,60 @@
package com.sdm.pbs.schedule.hpc;
import com.sdm.common.common.SdmResponse;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.entity.SimulationJob;
import com.sdm.pbs.schedule.hpc.hander.HpcJobStatusStrategyFactory;
import com.sdm.pbs.service.IPbsService;
import com.sdm.pbs.service.ISimulationJobService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class HpcJobStatusScheduleExcutor implements Runnable{
@Autowired
private ISimulationJobService simulationJobService;
@Autowired
@Qualifier("decoratorPbsService")
private IPbsService pbsService;
@Autowired
private HpcJobStatusStrategyFactory strategyFactory;
@Override
public void run() {
try {
// Configuring,Queued,Running,Canceled,Finished,Failed
// 查询 jobId非空 jobStatus非 Canceled,Failed 数据, Finished,文件可能传完,可能没有传递完
List<SimulationJob> list = simulationJobService.lambdaQuery()
.isNotNull(SimulationJob::getJobId)
.notIn(SimulationJob::getJobStatus, "Canceled", "Failed")
// 回传文件状态
.notIn(SimulationJob::getFileStatus, "generating", "uploading")
.list();
if(CollectionUtils.isNotEmpty(list)){
log.info("HpcJobStatus query db data null");
return;
}
// 非Finished的数据直接修改状态Finished修改状态+通知上传到minio
for(SimulationJob simJob : list){
SdmResponse<HpcJobStatusInfo> hpcResponse = pbsService.getJobStatus(simJob.getJobId());
if(hpcResponse.isSuccess()&&hpcResponse.getData()!=null){
HpcJobStatusInfo data = hpcResponse.getData();
String jobStatus = data.getJobStatus();
strategyFactory.getHandler(jobStatus).handle(simJob,data);;
}
}
} catch (Exception e) {
log.error("HpcJobStatus schedule error:{}",e.getMessage());
}
}
}

View File

@@ -0,0 +1,65 @@
package com.sdm.pbs.schedule.hpc.hander;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.DateUtils;
import com.sdm.common.utils.String2NumberUtil;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.entity.SimulationJob;
import com.sdm.pbs.service.HpcInstructionService;
import com.sdm.pbs.service.ISimulationJobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class FinishedStatusHandler implements JobStatusHandler {
@Autowired
private ISimulationJobService simulationJobService;
@Autowired
private HpcInstructionService hpcInstructionService;
@Override
public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) {
try {
// 过程结束修改
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
newDbJob.setFileStatus(statusInfo.getJobStatus());
newDbJob.setStartTime(statusInfo.getStartTime());
newDbJob.setEndTime(statusInfo.getEndTime());
newDbJob.setNodeName(statusInfo.getAllocatedNodes());
newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime()));
newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime()));
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
newDbJob.setUpdateTime(LocalDateTime.now());
// 通知工具回传文件
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath());
if (!callResponse.isSuccess()||!callResponse.getData()) {
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
return;
}
// 通知成功修改状态
newDbJob.setFileStatus("uploading");
simulationJobService.updateById(newDbJob);
} catch (Exception e) {
CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
}
}
@Override
public List<String> getSupportedStatus() {
// todo 抽取成枚举类
return Collections.singletonList("Finished");
}
}

View File

@@ -0,0 +1,44 @@
package com.sdm.pbs.schedule.hpc.hander;
import com.sdm.common.log.CoreLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class HpcJobStatusStrategyFactory {
private final Map<String, JobStatusHandler> strategyMap = new ConcurrentHashMap<>();
/**
* 注入所有JobStatusHandler实现类并注册到工厂
*/
@Autowired
public HpcJobStatusStrategyFactory(List<JobStatusHandler> handlers) {
for (JobStatusHandler handler : handlers) {
for (String status : handler.getSupportedStatus()) {
strategyMap.put(status, handler);
CoreLogger.info("Register strategy for status:{} with handler:{}",
status, handler.getClass().getSimpleName());
}
}
}
/**
* 根据状态获取对应的处理器
*/
public JobStatusHandler getHandler(String status) {
JobStatusHandler handler = strategyMap.get(status);
if (handler == null) {
CoreLogger.error("No handler registered for status:{}", status);
// 返回修改状态的hander
return strategyMap.get("Running");
}
return handler;
}
}

View File

@@ -0,0 +1,21 @@
package com.sdm.pbs.schedule.hpc.hander;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.entity.SimulationJob;
import java.util.List;
public interface JobStatusHandler {
/**
* 处理作业状态
* @param simJob 作业信息
* @param statusInfo 状态信息
*/
void handle(SimulationJob simJob,HpcJobStatusInfo statusInfo);
/**
* 获取支持的状态类型
* @return 状态列表
*/
List<String> getSupportedStatus();
}

View File

@@ -0,0 +1,47 @@
package com.sdm.pbs.schedule.hpc.hander;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.String2NumberUtil;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.entity.SimulationJob;
import com.sdm.pbs.service.ISimulationJobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@Component
public class ProcessStatusHandler implements JobStatusHandler{
@Autowired
private ISimulationJobService simulationJobService;
@Override
public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) {
try {
// 过程中状态修改
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
newDbJob.setFileStatus(statusInfo.getJobStatus());
newDbJob.setStartTime(statusInfo.getStartTime());
newDbJob.setEndTime(statusInfo.getEndTime());
newDbJob.setNodeName(statusInfo.getAllocatedNodes());
newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime()));
newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime()));
newDbJob.setUpdateTime(LocalDateTime.now());
simulationJobService.updateById(newDbJob);
} catch (Exception e) {
CoreLogger.error("HpcJobStatus handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
}
}
@Override
public List<String> getSupportedStatus() {
// todo 枚举类
return Arrays.asList("Configuring", "Queued", "Running", "Canceled", "Failed");
}
}

View File

@@ -54,5 +54,7 @@ public interface HpcInstructionService {
ResponseEntity<StreamingResponseBody> hpcDownloadFile(String fileName,Long fileSize);
// 通知hpc回传文件
SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir);
}

View File

@@ -479,6 +479,11 @@ public class HpcInstructionServiceImpl implements HpcInstructionService {
return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize);
}
@Override
public SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir) {
return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir);
}
/**
* 使用 MappedByteBuffer + 分块映射实现超大文件下载(高性能、低内存)
*/

View File

@@ -106,6 +106,10 @@ hpc:
excuteWay: remote
remoteCmdUrl: http://192.168.65.55:9097/doProcess
remoteCreateDirUrl: http://192.168.65.55:9097/createDir
remoteScanDirUrl: http://192.168.65.55:9097/scanDir
remoteDownLoadFileUrl: http://192.168.65.55:9097/hpcDownload
callHpcUpload: http://192.168.65.55:9097/addJobQueue
#logging:
# config: ./config/logback.xml