Merge remote-tracking branch 'origin/main'
This commit is contained in:
@@ -3,6 +3,7 @@ package com.sdm.common.feign.impl.capability;
|
|||||||
import com.sdm.common.common.SdmResponse;
|
import com.sdm.common.common.SdmResponse;
|
||||||
import com.sdm.common.entity.req.capability.FlowNodeDto;
|
import com.sdm.common.entity.req.capability.FlowNodeDto;
|
||||||
import com.sdm.common.entity.req.system.LaunchApproveReq;
|
import com.sdm.common.entity.req.system.LaunchApproveReq;
|
||||||
|
import com.sdm.common.entity.resp.capability.FlowTemplateResp;
|
||||||
import com.sdm.common.feign.inter.capability.ISimulationFlowFeignClient;
|
import com.sdm.common.feign.inter.capability.ISimulationFlowFeignClient;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -34,8 +35,8 @@ public class SimulationFlowFeignClientImpl implements ISimulationFlowFeignClient
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SdmResponse queryFlowTemplateInfo(String uuid) {
|
public SdmResponse<FlowTemplateResp> queryFlowTemplateInfo(String uuid) {
|
||||||
SdmResponse response;
|
SdmResponse<FlowTemplateResp> response;
|
||||||
try {
|
try {
|
||||||
response = flowFeignClient.queryFlowTemplateInfo(uuid);
|
response = flowFeignClient.queryFlowTemplateInfo(uuid);
|
||||||
if (!response.isSuccess()) {
|
if (!response.isSuccess()) {
|
||||||
|
|||||||
@@ -60,4 +60,17 @@ public class FlowableClientFeignClientImpl implements IFlowableFeignClient {
|
|||||||
return SdmResponse.failed("保存节点参数失败");
|
return SdmResponse.failed("保存节点参数失败");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SdmResponse updateNodeParamProcessInstanceId(String processDefinitionId, String processInstanceId) {
|
||||||
|
SdmResponse response;
|
||||||
|
try {
|
||||||
|
response = flowableFeignClient.updateNodeParamProcessInstanceId(processDefinitionId, processInstanceId);
|
||||||
|
log.info("更新流程参数的流程实例id:"+ response);
|
||||||
|
return response;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("更新流程参数的流程实例id失败", e);
|
||||||
|
return SdmResponse.failed("更新流程参数的流程实例id失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,4 +25,7 @@ public interface IFlowableFeignClient {
|
|||||||
@PostMapping("/saveParamsByDefinitionId")
|
@PostMapping("/saveParamsByDefinitionId")
|
||||||
SdmResponse saveParamsByDefinitionId(@RequestParam String processDefinitionId, @RequestParam String nodeId, @RequestBody Map<String, Object> params);
|
SdmResponse saveParamsByDefinitionId(@RequestParam String processDefinitionId, @RequestParam String nodeId, @RequestBody Map<String, Object> params);
|
||||||
|
|
||||||
|
@PostMapping("/updateNodeParamProcessInstanceId")
|
||||||
|
SdmResponse updateNodeParamProcessInstanceId(@RequestParam String processDefinitionId, @RequestParam String processInstanceId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,23 @@
|
|||||||
package com.sdm.common.utils;
|
package com.sdm.common.utils;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.time.*;
|
import java.time.LocalDate;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.LocalTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.time.temporal.TemporalAdjusters;
|
import java.time.temporal.TemporalAdjusters;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 日期时间工具类
|
* 日期时间工具类
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public class DateUtils {
|
public class DateUtils {
|
||||||
|
|
||||||
// 常用日期时间格式
|
// 常用日期时间格式
|
||||||
@@ -22,6 +29,10 @@ public class DateUtils {
|
|||||||
public static final String PATTERN_COMPACT = "yyyyMMddHHmmss";
|
public static final String PATTERN_COMPACT = "yyyyMMddHHmmss";
|
||||||
public static final String PATTERN_COMPACT_DATE = "yyyyMMdd";
|
public static final String PATTERN_COMPACT_DATE = "yyyyMMdd";
|
||||||
|
|
||||||
|
// 时间格式
|
||||||
|
private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm:ss";
|
||||||
|
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
|
||||||
|
|
||||||
private DateUtils() {
|
private DateUtils() {
|
||||||
throw new IllegalStateException("Utility class");
|
throw new IllegalStateException("Utility class");
|
||||||
}
|
}
|
||||||
@@ -229,4 +240,79 @@ public class DateUtils {
|
|||||||
return toDate(localDate);
|
return toDate(localDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 计算两个时间字符串的耗时(单位:毫秒)
|
||||||
|
* @param startTimeStr 开始时间字符串(格式:yyyy/MM/dd HH:mm:ss)
|
||||||
|
* @param endTimeStr 结束时间字符串(格式:yyyy/MM/dd HH:mm:ss)
|
||||||
|
* @return 耗时(毫秒),时间无效返回null
|
||||||
|
*/
|
||||||
|
public static Long calculateTimeConsume(String startTimeStr, String endTimeStr) {
|
||||||
|
// 空值校验
|
||||||
|
if (startTimeStr == null || startTimeStr.trim().isEmpty() ||
|
||||||
|
endTimeStr == null || endTimeStr.trim().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 解析时间字符串
|
||||||
|
Date startTime = DATE_FORMATTER.parse(startTimeStr.trim());
|
||||||
|
Date endTime = DATE_FORMATTER.parse(endTimeStr.trim());
|
||||||
|
|
||||||
|
// 计算耗时(确保结束时间不早于开始时间)
|
||||||
|
long consumeMillis = endTime.getTime() - startTime.getTime();
|
||||||
|
return consumeMillis >= 0 ? consumeMillis : null;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 时间格式解析失败
|
||||||
|
log.warn("时间格式解析失败,startTime: {}, endTime: {}, 格式要求:{}",
|
||||||
|
startTimeStr, endTimeStr, DEFAULT_DATE_FORMAT);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 计算耗时并转换为指定时间单位
|
||||||
|
* @param startTimeStr 开始时间字符串
|
||||||
|
* @param endTimeStr 结束时间字符串
|
||||||
|
* @param timeUnit 目标时间单位
|
||||||
|
* @return 耗时(指定单位),时间无效返回null
|
||||||
|
*/
|
||||||
|
public static Long calculateTimeConsume(String startTimeStr, String endTimeStr, TimeUnit timeUnit) {
|
||||||
|
Long consumeMillis = calculateTimeConsume(startTimeStr, endTimeStr);
|
||||||
|
if (consumeMillis == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return timeUnit.convert(consumeMillis, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取格式化的耗时字符串(xx小时xx分xx秒)
|
||||||
|
* @param startTimeStr 开始时间字符串
|
||||||
|
* @param endTimeStr 结束时间字符串
|
||||||
|
* @return 格式化耗时字符串,时间无效返回null
|
||||||
|
*/
|
||||||
|
public static String getFormattedConsumeTime(String startTimeStr, String endTimeStr) {
|
||||||
|
Long consumeMillis = calculateTimeConsume(startTimeStr, endTimeStr);
|
||||||
|
if (consumeMillis == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
long hours = TimeUnit.MILLISECONDS.toHours(consumeMillis);
|
||||||
|
long minutes = TimeUnit.MILLISECONDS.toMinutes(consumeMillis) % 60;
|
||||||
|
long seconds = TimeUnit.MILLISECONDS.toSeconds(consumeMillis) % 60;
|
||||||
|
long millis = consumeMillis % 1000;
|
||||||
|
|
||||||
|
if (hours > 0) {
|
||||||
|
return String.format("%d小时%d分%d秒%d毫秒", hours, minutes, seconds, millis);
|
||||||
|
} else if (minutes > 0) {
|
||||||
|
return String.format("%d分%d秒%d毫秒", minutes, seconds, millis);
|
||||||
|
} else if (seconds > 0) {
|
||||||
|
return String.format("%d秒%d毫秒", seconds, millis);
|
||||||
|
} else {
|
||||||
|
return String.format("%d毫秒", millis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -2,6 +2,7 @@ package com.sdm.common.utils;
|
|||||||
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.alibaba.fastjson2.TypeReference;
|
import com.alibaba.fastjson2.TypeReference;
|
||||||
|
import com.sdm.common.common.SdmResponse;
|
||||||
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
|
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
|
||||||
import com.sdm.common.log.CoreLogger;
|
import com.sdm.common.log.CoreLogger;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -43,6 +44,9 @@ public class HpcCommandExcuteUtil {
|
|||||||
@Value("${hpc.remoteDownLoadFileUrl:}")
|
@Value("${hpc.remoteDownLoadFileUrl:}")
|
||||||
private String remoteDownLoadFileUrl;
|
private String remoteDownLoadFileUrl;
|
||||||
|
|
||||||
|
@Value("${hpc.callHpcUpload:}")
|
||||||
|
private String callHpcUpload;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private HttpClientUtil httpClientUtil;
|
private HttpClientUtil httpClientUtil;
|
||||||
|
|
||||||
@@ -159,6 +163,27 @@ public class HpcCommandExcuteUtil {
|
|||||||
return builder.body(body);
|
return builder.body(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SdmResponse<Boolean> callHpcUploadToTarget(String jobId, String workDir) {
|
||||||
|
com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject();
|
||||||
|
paramJson.put("jobId", jobId);
|
||||||
|
paramJson.put("jobWorkDir", workDir);
|
||||||
|
Boolean call = false;
|
||||||
|
String resultString = "";
|
||||||
|
try {
|
||||||
|
resultString = httpClientUtil.doPostJson(callHpcUpload, paramJson.toJSONString());
|
||||||
|
CoreLogger.info("callHpcUploadToTarget back:{}", resultString);
|
||||||
|
call = JSON.parseObject(
|
||||||
|
resultString,
|
||||||
|
new TypeReference<Boolean>() {
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return SdmResponse.success(call);
|
||||||
|
} catch (Exception e) {
|
||||||
|
CoreLogger.error("callHpcUploadToTarget error,jobId:{},workDir:{},errMsg:{}", jobId,workDir, e.getMessage());
|
||||||
|
return SdmResponse.failed(call);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String extractFileName(String path) {
|
private String extractFileName(String path) {
|
||||||
if (path == null || path.isBlank()) {
|
if (path == null || path.isBlank()) {
|
||||||
return "unknown";
|
return "unknown";
|
||||||
@@ -171,4 +196,5 @@ public class HpcCommandExcuteUtil {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,25 @@
|
|||||||
|
package com.sdm.common.utils;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class String2NumberUtil {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将字符串转换为Long,转换失败返回null
|
||||||
|
* @param str 待转换的字符串
|
||||||
|
* @return 转换后的Long值,失败返回null
|
||||||
|
*/
|
||||||
|
public static Long stringToLong(String str) {
|
||||||
|
if (str == null || str.trim().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return Long.parseLong(str.trim());
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
log.warn("字符串转换Long失败,输入值:{}", str, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -77,7 +77,7 @@ public class TaskController implements ITaskFeignClient {
|
|||||||
return pbsService.submitHpcJob(submitHpcTaskReq);
|
return pbsService.submitHpcJob(submitHpcTaskReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/stopHpcJob")
|
@GetMapping("/stopHpcJob")
|
||||||
@Operation(summary = "作业停止")
|
@Operation(summary = "作业停止")
|
||||||
public SdmResponse<Boolean> stopHpcJob(@RequestParam String jobId) {
|
public SdmResponse<Boolean> stopHpcJob(@RequestParam String jobId) {
|
||||||
return pbsService.stopHpcJob(jobId);
|
return pbsService.stopHpcJob(jobId);
|
||||||
@@ -223,5 +223,11 @@ public class TaskController implements ITaskFeignClient {
|
|||||||
return hpcInstructionService.hpcDownloadFiles(downloadFilesReq);
|
return hpcInstructionService.hpcDownloadFiles(downloadFilesReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 通知文件回传 mock 测试使用
|
||||||
|
@PostMapping("/callHpcUploadToTarget")
|
||||||
|
public SdmResponse<Boolean> hpcDownloadFiles(@RequestBody Map paramMap) {
|
||||||
|
return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,4 +33,14 @@ public class HpcJobStatusInfo {
|
|||||||
|
|
||||||
@Schema(description = "作业计算软件")
|
@Schema(description = "作业计算软件")
|
||||||
public String software;
|
public String software;
|
||||||
|
|
||||||
|
@Schema(description = "作业计算节点")
|
||||||
|
private String allocatedNodes;
|
||||||
|
|
||||||
|
@Schema(description = "总内核态时间(单位:毫秒)")
|
||||||
|
private String totalKernelTime;
|
||||||
|
|
||||||
|
@Schema(description = "总用户态时间(单位:毫秒)")
|
||||||
|
private String totalUserTime;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package com.sdm.pbs.schedule.hpc;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
// 后期多节点部署,优化成分布式定时任务
|
||||||
|
@Component
|
||||||
|
public class HpcJobStatusSchedule {
|
||||||
|
|
||||||
|
@Value("${task.schedule.interval:120}")
|
||||||
|
private long interval;
|
||||||
|
|
||||||
|
@Value("${task.schedule.poolSize:1}")
|
||||||
|
private int poolSize;
|
||||||
|
|
||||||
|
private ScheduledExecutorService scheduler;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private HpcJobStatusScheduleExcutor hpcJobStatusScheduleService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 初始化定时任务:直接提交Service作为任务
|
||||||
|
*/
|
||||||
|
@PostConstruct
|
||||||
|
public void initScheduledTask() {
|
||||||
|
scheduler = Executors.newScheduledThreadPool(poolSize);
|
||||||
|
scheduler.scheduleWithFixedDelay(
|
||||||
|
hpcJobStatusScheduleService,
|
||||||
|
0,
|
||||||
|
interval,
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 销毁线程池
|
||||||
|
*/
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
if (scheduler != null) {
|
||||||
|
scheduler.shutdown();
|
||||||
|
try {
|
||||||
|
if (!scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||||
|
scheduler.shutdownNow();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
scheduler.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,60 @@
|
|||||||
|
package com.sdm.pbs.schedule.hpc;
|
||||||
|
|
||||||
|
import com.sdm.common.common.SdmResponse;
|
||||||
|
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
|
||||||
|
import com.sdm.pbs.model.entity.SimulationJob;
|
||||||
|
import com.sdm.pbs.schedule.hpc.hander.HpcJobStatusStrategyFactory;
|
||||||
|
import com.sdm.pbs.service.IPbsService;
|
||||||
|
import com.sdm.pbs.service.ISimulationJobService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class HpcJobStatusScheduleExcutor implements Runnable{
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ISimulationJobService simulationJobService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("decoratorPbsService")
|
||||||
|
private IPbsService pbsService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private HpcJobStatusStrategyFactory strategyFactory;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// Configuring,Queued,Running,Canceled,Finished,Failed
|
||||||
|
// 查询 jobId非空, jobStatus非 Canceled,Failed 数据, Finished,文件可能传完,可能没有传递完
|
||||||
|
List<SimulationJob> list = simulationJobService.lambdaQuery()
|
||||||
|
.isNotNull(SimulationJob::getJobId)
|
||||||
|
.notIn(SimulationJob::getJobStatus, "Canceled", "Failed")
|
||||||
|
// 回传文件状态
|
||||||
|
.notIn(SimulationJob::getFileStatus, "generating", "uploading")
|
||||||
|
.list();
|
||||||
|
if(CollectionUtils.isNotEmpty(list)){
|
||||||
|
log.info("HpcJobStatus query db data null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 非Finished的数据直接修改状态,Finished修改状态+通知上传到minio
|
||||||
|
for(SimulationJob simJob : list){
|
||||||
|
SdmResponse<HpcJobStatusInfo> hpcResponse = pbsService.getJobStatus(simJob.getJobId());
|
||||||
|
if(hpcResponse.isSuccess()&&hpcResponse.getData()!=null){
|
||||||
|
HpcJobStatusInfo data = hpcResponse.getData();
|
||||||
|
String jobStatus = data.getJobStatus();
|
||||||
|
strategyFactory.getHandler(jobStatus).handle(simJob,data);;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("HpcJobStatus schedule error:{}",e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
package com.sdm.pbs.schedule.hpc.hander;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.sdm.common.common.SdmResponse;
|
||||||
|
import com.sdm.common.log.CoreLogger;
|
||||||
|
import com.sdm.common.utils.DateUtils;
|
||||||
|
import com.sdm.common.utils.String2NumberUtil;
|
||||||
|
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
|
||||||
|
import com.sdm.pbs.model.entity.SimulationJob;
|
||||||
|
import com.sdm.pbs.service.HpcInstructionService;
|
||||||
|
import com.sdm.pbs.service.ISimulationJobService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class FinishedStatusHandler implements JobStatusHandler {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ISimulationJobService simulationJobService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private HpcInstructionService hpcInstructionService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) {
|
||||||
|
try {
|
||||||
|
// 过程结束修改
|
||||||
|
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
|
||||||
|
newDbJob.setFileStatus(statusInfo.getJobStatus());
|
||||||
|
newDbJob.setStartTime(statusInfo.getStartTime());
|
||||||
|
newDbJob.setEndTime(statusInfo.getEndTime());
|
||||||
|
newDbJob.setNodeName(statusInfo.getAllocatedNodes());
|
||||||
|
newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime()));
|
||||||
|
newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime()));
|
||||||
|
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
|
||||||
|
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
|
||||||
|
newDbJob.setUpdateTime(LocalDateTime.now());
|
||||||
|
// 通知工具回传文件
|
||||||
|
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath());
|
||||||
|
if (!callResponse.isSuccess()||!callResponse.getData()) {
|
||||||
|
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 通知成功修改状态
|
||||||
|
newDbJob.setFileStatus("uploading");
|
||||||
|
simulationJobService.updateById(newDbJob);
|
||||||
|
} catch (Exception e) {
|
||||||
|
CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
|
||||||
|
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getSupportedStatus() {
|
||||||
|
// todo 抽取成枚举类
|
||||||
|
return Collections.singletonList("Finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
package com.sdm.pbs.schedule.hpc.hander;
|
||||||
|
|
||||||
|
import com.sdm.common.log.CoreLogger;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class HpcJobStatusStrategyFactory {
|
||||||
|
private final Map<String, JobStatusHandler> strategyMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 注入所有JobStatusHandler实现类并注册到工厂
|
||||||
|
*/
|
||||||
|
@Autowired
|
||||||
|
public HpcJobStatusStrategyFactory(List<JobStatusHandler> handlers) {
|
||||||
|
for (JobStatusHandler handler : handlers) {
|
||||||
|
for (String status : handler.getSupportedStatus()) {
|
||||||
|
strategyMap.put(status, handler);
|
||||||
|
CoreLogger.info("Register strategy for status:{} with handler:{}",
|
||||||
|
status, handler.getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据状态获取对应的处理器
|
||||||
|
*/
|
||||||
|
public JobStatusHandler getHandler(String status) {
|
||||||
|
JobStatusHandler handler = strategyMap.get(status);
|
||||||
|
if (handler == null) {
|
||||||
|
CoreLogger.error("No handler registered for status:{}", status);
|
||||||
|
// 返回修改状态的hander
|
||||||
|
return strategyMap.get("Running");
|
||||||
|
}
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,21 @@
|
|||||||
|
package com.sdm.pbs.schedule.hpc.hander;
|
||||||
|
|
||||||
|
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
|
||||||
|
import com.sdm.pbs.model.entity.SimulationJob;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface JobStatusHandler {
|
||||||
|
/**
|
||||||
|
* 处理作业状态
|
||||||
|
* @param simJob 作业信息
|
||||||
|
* @param statusInfo 状态信息
|
||||||
|
*/
|
||||||
|
void handle(SimulationJob simJob,HpcJobStatusInfo statusInfo);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取支持的状态类型
|
||||||
|
* @return 状态列表
|
||||||
|
*/
|
||||||
|
List<String> getSupportedStatus();
|
||||||
|
}
|
||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package com.sdm.pbs.schedule.hpc.hander;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.sdm.common.log.CoreLogger;
|
||||||
|
import com.sdm.common.utils.String2NumberUtil;
|
||||||
|
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
|
||||||
|
import com.sdm.pbs.model.entity.SimulationJob;
|
||||||
|
import com.sdm.pbs.service.ISimulationJobService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class ProcessStatusHandler implements JobStatusHandler{
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ISimulationJobService simulationJobService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) {
|
||||||
|
try {
|
||||||
|
// 过程中状态修改
|
||||||
|
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
|
||||||
|
newDbJob.setFileStatus(statusInfo.getJobStatus());
|
||||||
|
newDbJob.setStartTime(statusInfo.getStartTime());
|
||||||
|
newDbJob.setEndTime(statusInfo.getEndTime());
|
||||||
|
newDbJob.setNodeName(statusInfo.getAllocatedNodes());
|
||||||
|
newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime()));
|
||||||
|
newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime()));
|
||||||
|
newDbJob.setUpdateTime(LocalDateTime.now());
|
||||||
|
simulationJobService.updateById(newDbJob);
|
||||||
|
} catch (Exception e) {
|
||||||
|
CoreLogger.error("HpcJobStatus handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
|
||||||
|
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getSupportedStatus() {
|
||||||
|
// todo 枚举类
|
||||||
|
return Arrays.asList("Configuring", "Queued", "Running", "Canceled", "Failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -54,5 +54,7 @@ public interface HpcInstructionService {
|
|||||||
|
|
||||||
ResponseEntity<StreamingResponseBody> hpcDownloadFile(String fileName,Long fileSize);
|
ResponseEntity<StreamingResponseBody> hpcDownloadFile(String fileName,Long fileSize);
|
||||||
|
|
||||||
|
// 通知hpc回传文件
|
||||||
|
SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -479,6 +479,11 @@ public class HpcInstructionServiceImpl implements HpcInstructionService {
|
|||||||
return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize);
|
return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir) {
|
||||||
|
return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用 MappedByteBuffer + 分块映射实现超大文件下载(高性能、低内存)
|
* 使用 MappedByteBuffer + 分块映射实现超大文件下载(高性能、低内存)
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -106,6 +106,10 @@ hpc:
|
|||||||
excuteWay: remote
|
excuteWay: remote
|
||||||
remoteCmdUrl: http://192.168.65.55:9097/doProcess
|
remoteCmdUrl: http://192.168.65.55:9097/doProcess
|
||||||
remoteCreateDirUrl: http://192.168.65.55:9097/createDir
|
remoteCreateDirUrl: http://192.168.65.55:9097/createDir
|
||||||
|
remoteScanDirUrl: http://192.168.65.55:9097/scanDir
|
||||||
|
remoteDownLoadFileUrl: http://192.168.65.55:9097/hpcDownload
|
||||||
|
callHpcUpload: http://192.168.65.55:9097/addJobQueue
|
||||||
|
|
||||||
|
|
||||||
#logging:
|
#logging:
|
||||||
# config: ./config/logback.xml
|
# config: ./config/logback.xml
|
||||||
|
|||||||
@@ -8,15 +8,15 @@ public class SpdmNodeParamReq {
|
|||||||
|
|
||||||
@Schema(description = "节点uuid")
|
@Schema(description = "节点uuid")
|
||||||
private String nodeUuid;
|
private String nodeUuid;
|
||||||
|
|
||||||
|
|
||||||
@Schema(description = "算例uuid")
|
@Schema(description = "算例uuid")
|
||||||
private String runId;
|
private String runId;
|
||||||
|
|
||||||
@Schema(description = "上传脚本文件id")
|
@Schema(description = "上传脚本文件id")
|
||||||
private String scriptFileId;
|
private String scriptFileId;
|
||||||
|
|
||||||
|
@Schema(description = "用户输入的正则表达式参数")
|
||||||
|
private String regExp;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -920,10 +920,11 @@ public class SimulationRunServiceImpl extends ServiceImpl<SimulationRunMapper, S
|
|||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public SdmResponse startProcessInstance(SpdmTaskRunReq req) {
|
public SdmResponse startProcessInstance(SpdmTaskRunReq req) {
|
||||||
SimulationRun simulationRun = this.lambdaQuery().eq(SimulationRun::getUuid, req.getRunId()).one();
|
SimulationRun simulationRun = this.lambdaQuery().eq(SimulationRun::getUuid, req.getRunId()).one();
|
||||||
// 启动流程实例 多次执行会生成多个流程实例id,更新算例run表
|
// 启动流程实例 多次执行会生成多个流程实例id,更新算例run表、同时更新flowable流程参数的流程实例id
|
||||||
SdmResponse<ProcessInstanceResp> sdmResponse = flowableFeignClient.startByProcessDefinitionId(simulationRun.getProcessDefinitionId(), null);
|
SdmResponse<ProcessInstanceResp> sdmResponse = flowableFeignClient.startByProcessDefinitionId(simulationRun.getProcessDefinitionId(), null);
|
||||||
if (sdmResponse.getData() != null) {
|
if (sdmResponse.getData() != null) {
|
||||||
this.lambdaUpdate().set(SimulationRun::getFlowInstanceId, sdmResponse.getData().getProcessInstanceId()).eq(SimulationRun::getUuid, req.getRunId()).update();
|
this.lambdaUpdate().set(SimulationRun::getFlowInstanceId, sdmResponse.getData().getProcessInstanceId()).eq(SimulationRun::getUuid, req.getRunId()).update();
|
||||||
|
flowableFeignClient.updateNodeParamProcessInstanceId(simulationRun.getProcessDefinitionId(), sdmResponse.getData().getProcessInstanceId());
|
||||||
} else {
|
} else {
|
||||||
return SdmResponse.failed("流程实例启动失败");
|
return SdmResponse.failed("流程实例启动失败");
|
||||||
}
|
}
|
||||||
@@ -932,14 +933,35 @@ public class SimulationRunServiceImpl extends ServiceImpl<SimulationRunMapper, S
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SdmResponse saveNodeParams(SpdmNodeParamReq req) {
|
public SdmResponse saveNodeParams(SpdmNodeParamReq req) {
|
||||||
FlowNodeDto flowNodeReq = new FlowNodeDto();
|
SimulationRun simulationRun = this.lambdaQuery().eq(SimulationRun::getUuid, req.getRunId()).one();
|
||||||
flowNodeReq.setUuid(req.getNodeUuid());
|
SdmResponse<FlowTemplateResp> flowTemplateResp = flowFeignClient.queryFlowTemplateInfo(simulationRun.getFlowTemplate());
|
||||||
SdmResponse<FlowNodeDto> sdmResponse = flowFeignClient.querySimulationFlowNode(flowNodeReq);
|
if (flowTemplateResp.getData() != null) {
|
||||||
if (sdmResponse.getData() != null) {
|
ProcessDefinitionDTO definitionDTO = JSON.parseObject(flowTemplateResp.getData().getTemplateContent(), ProcessDefinitionDTO.class);
|
||||||
// ProcessDefinitionDTO definitionDTO = JSON.parseObject(sdmResponse.getData().get(), ProcessDefinitionDTO.class);
|
FlowNodeDto flowNodeReq = new FlowNodeDto();
|
||||||
|
flowNodeReq.setUuid(req.getNodeUuid());
|
||||||
FlowNodeDto flowNodeDto = sdmResponse.getData();
|
SdmResponse<FlowNodeDto> sdmResponse = flowFeignClient.querySimulationFlowNode(flowNodeReq);
|
||||||
|
if (sdmResponse.getData() != null) {
|
||||||
|
FlowNodeDto flowNodeDto = sdmResponse.getData();
|
||||||
|
definitionDTO.getFlowElements().stream().filter(i -> StringUtils.equals(i.getId(), flowNodeDto.getNodeId())).findFirst().ifPresent(i -> {
|
||||||
|
if (i.getExtensionElements() != null && i.getExtensionElements().getExecuteConfig() != null) {
|
||||||
|
Map<String, Object> params = new HashMap<>();
|
||||||
|
if ("HPC".equals(i.getExtensionElements().getExecuteConfig().getExecuteType())) {
|
||||||
|
// 计算节点 输出文件夹id 保存到用户输入参数
|
||||||
|
params.put("outputDirId", flowNodeDto.getOutputDirId());
|
||||||
|
} else if ("exportWordScript".equals(i.getExtensionElements().getExecuteConfig().getExecuteType())) {
|
||||||
|
// 脚本节点
|
||||||
|
// 处理脚本上传到当前节点文件夹下,获得 脚本文件id
|
||||||
|
// 脚本文件id 保存到用户输入参数
|
||||||
|
params.put("scriptFileId", req.getScriptFileId());
|
||||||
|
// 正则表达式 保存到用户输入参数
|
||||||
|
params.put("regExp", req.getRegExp());
|
||||||
|
// 脚本输出文件夹id 保存到用户输入参数
|
||||||
|
params.put("outputDirId", flowNodeDto.getOutputDirId());
|
||||||
|
}
|
||||||
|
flowableFeignClient.saveParamsByDefinitionId(simulationRun.getProcessDefinitionId(), flowNodeDto.getNodeId(), params);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
Reference in New Issue
Block a user