修改:hpc任务取消,代码优化
This commit is contained in:
@@ -43,7 +43,7 @@ public class HpcJobStatusScheduleExcutor implements Runnable{
|
||||
.isNotNull(SimulationJob::getJobId)
|
||||
.notIn(SimulationJob::getJobStatus, "Canceled", "Failed")
|
||||
// 非上传中的,非回传结束的。JobStatus 结束 通知 uploading 只会有一次,回传失败后,人工改表修复
|
||||
.notIn(SimulationJob::getFileStatus, "uploading","finished")
|
||||
.notIn(SimulationJob::getFileStatus, "uploading","finished","failed")
|
||||
.list();
|
||||
if(CollectionUtils.isEmpty(list)){
|
||||
log.info("HpcJobStatus query db data null");
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
package com.sdm.pbs.schedule.hpc.hander;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.sdm.common.common.SdmResponse;
|
||||
import com.sdm.common.entity.enums.MessageTemplateEnum;
|
||||
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
|
||||
import com.sdm.common.entity.req.system.SendMsgReq;
|
||||
import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
|
||||
import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
|
||||
import com.sdm.common.log.CoreLogger;
|
||||
import com.sdm.common.utils.DateUtils;
|
||||
import com.sdm.common.utils.String2NumberUtil;
|
||||
@@ -9,15 +15,18 @@ import com.sdm.pbs.model.bo.HpcJobStatusInfo;
|
||||
import com.sdm.pbs.model.entity.SimulationJob;
|
||||
import com.sdm.pbs.service.HpcInstructionService;
|
||||
import com.sdm.pbs.service.ISimulationJobService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class FinishedStatusHandler implements JobStatusHandler {
|
||||
|
||||
// 租户桶的前缀 spdm-租户id
|
||||
@@ -29,10 +38,16 @@ public class FinishedStatusHandler implements JobStatusHandler {
|
||||
@Autowired
|
||||
private HpcInstructionService hpcInstructionService;
|
||||
|
||||
@Autowired
|
||||
private MessageFeignClientImpl messageFeignClient;
|
||||
|
||||
@Autowired
|
||||
private IFlowableFeignClient flowableFeignClient;
|
||||
|
||||
@Override
|
||||
public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) {
|
||||
try {
|
||||
// 过程结束修改
|
||||
log.info("任务id:{},Hpc任务详情:{}",simJob.getId(),JSON.toJSONString(statusInfo));
|
||||
if(Objects.equals(statusInfo.getJobStatus(),"Canceled")){
|
||||
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
|
||||
newDbJob.setJobStatus(statusInfo.getJobStatus());
|
||||
newDbJob.setStartTime(statusInfo.getStartTime());
|
||||
@@ -43,22 +58,49 @@ public class FinishedStatusHandler implements JobStatusHandler {
|
||||
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
|
||||
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
|
||||
newDbJob.setUpdateTime(LocalDateTime.now());
|
||||
String minioBucket = minioBucketPrefix + newDbJob.getTenantId();
|
||||
Long userId=newDbJob.getCreatorId();
|
||||
Long tenantId = newDbJob.getTenantId();
|
||||
// 通知工具回传文件 minio 或者 nas
|
||||
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(),minioBucket,
|
||||
newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath(),newDbJob.getDirId(),userId,tenantId);
|
||||
if (!callResponse.isSuccess()||!callResponse.getData()) {
|
||||
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
|
||||
return;
|
||||
}
|
||||
// 通知成功修改状态
|
||||
newDbJob.setFileStatus("uploading");
|
||||
newDbJob.setFileStatus("failed");
|
||||
simulationJobService.updateById(newDbJob);
|
||||
} catch (Exception e) {
|
||||
CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
|
||||
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
|
||||
// 回调通知工作流节点
|
||||
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
|
||||
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
|
||||
asyncCallbackRequest.setStatus("FAIL");
|
||||
asyncCallbackRequest.setResultJson("Hpc任务已取消");
|
||||
SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
|
||||
log.info("Hpc任务取消,通知工作流返回:{}",JSON.toJSONString(sdmResponse));
|
||||
// 发送成功完成的消息
|
||||
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"Hpc任务已取消");
|
||||
}
|
||||
if(Objects.equals(statusInfo.getJobStatus(),"Finished")){
|
||||
try {
|
||||
// 过程结束修改
|
||||
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
|
||||
newDbJob.setJobStatus(statusInfo.getJobStatus());
|
||||
newDbJob.setStartTime(statusInfo.getStartTime());
|
||||
newDbJob.setEndTime(statusInfo.getEndTime());
|
||||
newDbJob.setNodeName(statusInfo.getAllocatedNodes());
|
||||
newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime()));
|
||||
newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime()));
|
||||
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
|
||||
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
|
||||
newDbJob.setUpdateTime(LocalDateTime.now());
|
||||
String minioBucket = minioBucketPrefix + newDbJob.getTenantId();
|
||||
Long userId=newDbJob.getCreatorId();
|
||||
Long tenantId = newDbJob.getTenantId();
|
||||
// 通知工具回传文件 minio 或者 nas
|
||||
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(),minioBucket,
|
||||
newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath(),newDbJob.getDirId(),userId,tenantId);
|
||||
if (!callResponse.isSuccess()||!callResponse.getData()) {
|
||||
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
|
||||
return;
|
||||
}
|
||||
// 通知成功修改状态
|
||||
newDbJob.setFileStatus("uploading");
|
||||
simulationJobService.updateById(newDbJob);
|
||||
} catch (Exception e) {
|
||||
CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
|
||||
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -66,7 +108,17 @@ public class FinishedStatusHandler implements JobStatusHandler {
|
||||
@Override
|
||||
public List<String> getSupportedStatus() {
|
||||
// todo 抽取成枚举类
|
||||
return Collections.singletonList("Finished");
|
||||
return Arrays.asList("Finished","Canceled");
|
||||
}
|
||||
|
||||
private void sendMsg(Long tenanId,Long userId,String jobName,String result){
|
||||
SendMsgReq req = new SendMsgReq();
|
||||
req.setTitle(MessageTemplateEnum.HPC_END.getTitle());
|
||||
req.setContent(MessageTemplateEnum.HPC_END.getContent(jobName,result));
|
||||
req.setTenantId(String.valueOf(tenanId));
|
||||
req.setUserId(String.valueOf(userId));
|
||||
CoreLogger.info("hpc finish push msg:{}", JSON.toJSONString(req));
|
||||
messageFeignClient.sendMessage(req);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ public class ProcessStatusHandler implements JobStatusHandler{
|
||||
@Override
|
||||
public List<String> getSupportedStatus() {
|
||||
// todo 枚举类
|
||||
return Arrays.asList("Configuring", "Queued", "Running", "Canceled", "Failed");
|
||||
return Arrays.asList("Configuring", "Queued", "Running", "Failed");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
|
||||
throw new RuntimeException("Hpc执行失败,返回jobId空");
|
||||
}
|
||||
// 4. 保存任务信息到数据库
|
||||
saveSimulationJobToDb(req, jobId, hpcOutPutDir, commandResult.getCommand());
|
||||
saveSimulationJobToDb(req, jobId, hpcOutPutDir);
|
||||
return SdmResponse.success(jobId);
|
||||
}
|
||||
|
||||
@@ -254,9 +254,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
|
||||
* @param req 任务请求参数
|
||||
* @param jobId 任务ID
|
||||
* @param hpcOutPutDir 输出目录
|
||||
* @param command 执行命令
|
||||
* 执行命令
|
||||
*/
|
||||
private void saveSimulationJobToDb(SubmitHpcTaskReq req, String jobId, String hpcOutPutDir, String command) {
|
||||
private void saveSimulationJobToDb(SubmitHpcTaskReq req, String jobId, String hpcOutPutDir) {
|
||||
if (StringUtils.isNotEmpty(jobId)) {
|
||||
// 数据入库
|
||||
SimulationJob simulationJob = new SimulationJob();
|
||||
@@ -289,7 +289,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
|
||||
simulationJob.setStdoutSpdmNasFilePath(req.getStdoutSpdmNasFilePath());
|
||||
// 执行信息 定时任务回传的时候修改
|
||||
// simulationJob.setNodeName("");
|
||||
simulationJob.setExecutCommand(command);
|
||||
simulationJob.setExecutCommand(req.getCommand());
|
||||
// 执行信息 定时任务回传的时候修改
|
||||
// simulationJob.setStartTime("2025-11-30 10:00:00");
|
||||
// simulationJob.setEndTime("2025-11-30 12:30:00");
|
||||
@@ -614,25 +614,42 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
|
||||
public SdmResponse<Boolean> jobFileCallback(JobFileCallBackReq req) {
|
||||
CoreLogger.info("hpc jobFileCallback params:{}",JSONObject.toJSONString(req));
|
||||
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getJobId, req.getJobId()).one();
|
||||
String fileStatus="";
|
||||
String status="";
|
||||
String resultReson="";
|
||||
String endMsg="";
|
||||
if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"Y")) {
|
||||
// 回传成功
|
||||
newDbJob.setFileStatus("finished");
|
||||
newDbJob.setUpdateTime(LocalDateTime.now());
|
||||
simulationJobService.updateById(newDbJob);
|
||||
// 回调通知工作流节点
|
||||
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
|
||||
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
|
||||
asyncCallbackRequest.setResultJson("finished");
|
||||
// SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
|
||||
// 发送成功完成的消息
|
||||
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"成功");
|
||||
// CoreLogger.info("flowableFeignClient asyncCallback result:{}", JSONObject.toJSONString(sdmResponse));
|
||||
fileStatus="finished";
|
||||
status="SUCCESS";
|
||||
endMsg="成功";
|
||||
dealCallBack(newDbJob,fileStatus,status,endMsg,resultReson);
|
||||
return SdmResponse.success(true);
|
||||
}else{
|
||||
// 发送失败完成的消息
|
||||
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"失败");
|
||||
}
|
||||
return SdmResponse.success(false);
|
||||
if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"N")) {
|
||||
fileStatus="failed";
|
||||
status="FAIL";
|
||||
resultReson="文件回传失败";
|
||||
endMsg="失败";
|
||||
dealCallBack(newDbJob,fileStatus,status,endMsg,resultReson);
|
||||
return SdmResponse.success(false);
|
||||
}
|
||||
return SdmResponse.success(true);
|
||||
}
|
||||
|
||||
private void dealCallBack(SimulationJob newDbJob,String fileStatus,String status,String endMsg,String resultReson){
|
||||
// 回传成功
|
||||
newDbJob.setFileStatus(fileStatus);
|
||||
newDbJob.setUpdateTime(LocalDateTime.now());
|
||||
simulationJobService.updateById(newDbJob);
|
||||
// 回调通知工作流节点
|
||||
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
|
||||
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
|
||||
asyncCallbackRequest.setStatus(status);
|
||||
asyncCallbackRequest.setResultJson(resultReson);
|
||||
SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
|
||||
// 发送成功完成的消息
|
||||
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),endMsg);
|
||||
log.info("Hpc任务{}完成,通知工作流返回:{}",endMsg,JSON.toJSONString(sdmResponse));
|
||||
}
|
||||
|
||||
private void sendMsg(Long tenanId,Long userId,String jobName,String result){
|
||||
|
||||
Reference in New Issue
Block a user