修改:hpc定义接口实现提交

This commit is contained in:
yangyang01000846
2025-11-27 20:58:28 +08:00
parent 82d800cd12
commit 56aa5b14e5
12 changed files with 179 additions and 31 deletions

View File

@@ -12,4 +12,10 @@ public class HpcConstants {
// 结果类型 table
public static final String FORMAT_TABLE = "table";
// HPC NODE 节点状态
public static final String NODE_STATE_ONLINE = "Online";
// 取消任务的方式 force 强制、graceful 优雅默认graceful
public static final String CANCEL_JOB_FORCE = "force";
public static final String CANCEL_JOB_GRACEFUL = "graceful";
}

View File

@@ -0,0 +1,50 @@
package com.sdm.common.entity.req.pbs;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class SubmitHpcTaskRemoteReq {
@Schema(description = "计算任务名称")
public String jobName;
@Schema(description = "计算所需要核数")
public int coreNum;
@Schema(description = "计算软件")
public String software;
@Schema(description = "计算任务类型")
public String jobType;
@Schema(description = "计算任务是否独立存在 0非独立任务 1独立任务")
public int independence;
@Schema(description = "求解文件")
public List<String> inputFiles = new ArrayList<>();
@Schema(description = "计算主文件")
public String masterFile;
@Schema(description = "计算任务所属任务ID")
public String taskId;
@Schema(description = "计算任务所属任务名称")
public String taskName;
@Schema(description = "计算任务所属算力ID")
public String runId;
@Schema(description = "计算任务所属算力名称")
public String runName;
@Schema(description = "执行的命令")
public String command;
@Schema(description = "任务所属项目")
public String projectname;
}

View File

@@ -1,7 +1,7 @@
package com.sdm.common.feign.impl.pbs;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger;
import lombok.extern.slf4j.Slf4j;
@@ -16,7 +16,7 @@ public class TaskClientFeignClientImpl implements ITaskFeignClient {
private ITaskFeignClient taskFeignClient;
@Override
public SdmResponse<String> submitHpcJob(SubmitHpcTaskReq req) {
public SdmResponse<String> submitHpcJob(SubmitHpcTaskRemoteReq req) {
SdmResponse<String> response;
try {
response = taskFeignClient.submitHpcJob(req);

View File

@@ -2,7 +2,7 @@ package com.sdm.common.feign.inter.pbs;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.config.LongTimeRespFeignConfig;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -16,6 +16,6 @@ public interface ITaskFeignClient {
// "作业提交"
@PostMapping("/pbs/submitHpcJob")
SdmResponse<String> submitHpcJob(@RequestBody SubmitHpcTaskReq req);
SdmResponse<String> submitHpcJob(@RequestBody SubmitHpcTaskRemoteReq req);
}

View File

@@ -1,7 +1,7 @@
package com.sdm.flowable.delegate.handler;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.flowable.config.executeConfig.BaseExecuteConfig;
import com.sdm.flowable.service.IAsyncTaskRecordService;
@@ -34,7 +34,7 @@ public class HpcHandler implements ExecutionHandler {
// INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败)
String status = "INIT";
// 1. 调用 HPC 平台提交任务
SubmitHpcTaskReq mockReq = mockSubmitHpcTaskReq();
SubmitHpcTaskRemoteReq mockReq = mockSubmitHpcTaskReq();
SdmResponse<String> submitResp = taskFeignClient.submitHpcJob(mockReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName());
@@ -56,7 +56,7 @@ public class HpcHandler implements ExecutionHandler {
public String mockinit(){
SubmitHpcTaskReq mockReq = mockSubmitHpcTaskReq();
SubmitHpcTaskRemoteReq mockReq = mockSubmitHpcTaskReq();
SdmResponse<String> submitResp = taskFeignClient.submitHpcJob(mockReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName());
@@ -67,8 +67,8 @@ public class HpcHandler implements ExecutionHandler {
return hpcTaskId;
}
private SubmitHpcTaskReq mockSubmitHpcTaskReq() {
SubmitHpcTaskReq req = new SubmitHpcTaskReq();
private SubmitHpcTaskRemoteReq mockSubmitHpcTaskReq() {
SubmitHpcTaskRemoteReq req = new SubmitHpcTaskRemoteReq();
// 生成任务名称:年月日-时分秒,如 20251127-145120
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd-HHmmss");
String timestamp = sdf.format(new Date());

View File

@@ -1,7 +1,7 @@
package com.sdm.pbs.controller;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.entity.req.pbs.hpc.*;
import com.sdm.common.entity.resp.pbs.hpc.*;
import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobResp;
@@ -11,11 +11,14 @@ import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeListResp;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.utils.HpcCommandExcuteUtil;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.service.HpcInstructionService;
import com.sdm.pbs.service.IPbsService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.ResponseEntity;
@@ -51,10 +54,24 @@ public class TaskController implements ITaskFeignClient {
return hpcCommandExcuteUtil.excuteCmd(command,"remote");
}
@GetMapping ("/queryHpcResource")
@Operation(summary = "HPC资源查询")
public SdmResponse<HpcResouceInfo> queryHpcResource() {
return pbsService.queryHpcResource();
}
@PostMapping("/submitHpcJob")
@Operation(summary = "作业提交")
public SdmResponse<String> submitHpcJob(@RequestBody SubmitHpcTaskReq req) {
return pbsService.submitHpcJob(req);
public SdmResponse<String> submitHpcJob(@RequestBody SubmitHpcTaskRemoteReq req) {
SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq();
BeanUtils.copyProperties(req,submitHpcTaskReq);
return pbsService.submitHpcJob(submitHpcTaskReq);
}
@PostMapping("/stopHpcJob")
@Operation(summary = "作业停止")
public SdmResponse<Boolean> stopHpcJob(@RequestParam String jobId) {
return pbsService.stopHpcJob(jobId);
}
@PostMapping("/getJobStatus")

View File

@@ -1,17 +1,19 @@
package com.sdm.pbs.model.bo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class HpcNodeInfo {
@Schema(description = "计算节点名称")
public String nodeName;
@Schema(description = "计算节点状态 0down 1running 2suspend")
public int nodeStatus;
@Schema(description = "计算节点状态 Online, Offline, Draining,Unreachable")
public String nodeStatus;
@Schema(description = "节点总核数")
public int totalCores;

View File

@@ -1,10 +1,12 @@
package com.sdm.pbs.model.bo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class HpcResouceInfo {
@Schema(description = "资源总核数")

View File

@@ -1,4 +1,4 @@
package com.sdm.common.entity.req.pbs;
package com.sdm.pbs.model.req;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@@ -1,10 +1,10 @@
package com.sdm.pbs.service;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.pbs.model.bo.FileBaseInfo;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import java.util.List;

View File

@@ -2,17 +2,25 @@ package com.sdm.pbs.service.impl;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.constants.HpcConstants;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.constants.NumberConstants;
import com.sdm.common.entity.req.pbs.hpc.*;
import com.sdm.common.entity.resp.pbs.hpc.JobCancelResp;
import com.sdm.common.entity.resp.pbs.hpc.SubmitHpcJobResp;
import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobResp;
import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobs;
import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasks;
import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasksResp;
import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeList;
import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeListResp;
import com.sdm.common.log.CoreLogger;
import com.sdm.pbs.model.bo.FileBaseInfo;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcNodeInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.service.HpcInstructionService;
import com.sdm.pbs.service.IPbsService;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -21,6 +29,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -35,11 +45,15 @@ public class IPbsHpcServiceImpl implements IPbsService {
@Override
public SdmResponse<HpcResouceInfo> queryHpcResource() {
ListJobReq listJobReq = new ListJobReq();
listJobReq.setFormat(HpcConstants.FORMAT_LIST);
SdmResponse<ListJobResp> hpcResp = hpcInstructionService.jobList(listJobReq);
// todo
return null;
NodeListReq nodeReq = new NodeListReq();
nodeReq.setFormat(HpcConstants.FORMAT_LIST);
SdmResponse<NodeListResp> hpcResp = hpcInstructionService.nodeList(nodeReq);
if(!hpcResp.isSuccess()){
return SdmResponse.failed("HPC节点信息查询失败");
}
HpcResouceInfo resourceInfo =
buildHpcResourceInfo(hpcResp.getData().getNodes());
return SdmResponse.success(resourceInfo);
}
@Override
@@ -71,7 +85,14 @@ public class IPbsHpcServiceImpl implements IPbsService {
@Override
public SdmResponse<Boolean> stopHpcJob(String jobId) {
return null;
CancelJobReq req = new CancelJobReq();
req.setJobId(jobId);
req.setCancelWay(HpcConstants.CANCEL_JOB_FORCE);
SdmResponse<JobCancelResp> stopResp = hpcInstructionService.jobCancel(req);
if(!stopResp.isSuccess()){
return SdmResponse.failed(false);
}
return SdmResponse.success(stopResp.getData().getCanceled());
}
@Override
@@ -115,11 +136,61 @@ public class IPbsHpcServiceImpl implements IPbsService {
return null;
}
// @Value("${file.rootPath}")
// private String rootPath;
//
// @Resource
// private RedisUtil redisUtil;
private HpcResouceInfo buildHpcResourceInfo(List<NodeList> nodes) {
HpcResouceInfo result = new HpcResouceInfo();
result.setNodeList(Collections.emptyList());
if (nodes == null || nodes.isEmpty()) {
result.setFreeCores(0);
result.setTotalCores(0);
result.setUsedCores(0);
return result;
}
int totalCores = 0;
int usedCores = 0;
int freeCoresSum = 0;
List<HpcNodeInfo> nodeInfos = new ArrayList<>(nodes.size());
for (NodeList node : nodes) {
if (node == null) {
continue;
}
int max = safeParseInt(node.getMax());
int run = safeParseInt(node.getRun());
int free = Math.max(max - run, 0);
HpcNodeInfo info = new HpcNodeInfo();
info.setNodeName(node.getNodeName());
info.setTotalCores(max);
info.setUsedCores(run);
info.setFreeCores(free);
info.setNodeStatus(node.getState());
if (HpcConstants.NODE_STATE_ONLINE.equals(node.getState())) {
totalCores += max;
usedCores += run;
freeCoresSum += free;
}
nodeInfos.add(info);
}
result.setTotalCores(totalCores);
result.setUsedCores(usedCores);
result.setFreeCores(freeCoresSum);
result.setNodeList(nodeInfos);
return result;
}
private int safeParseInt(String value) {
if (value == null || value.trim().isEmpty()) {
return NumberConstants.ZERO;
}
try {
return Integer.parseInt(value.trim());
} catch (Exception e) {
CoreLogger.error("safeParseInt error:{},value:{}", e.getMessage(), value);
return NumberConstants.ZERO;
}
}

View File

@@ -1,10 +1,10 @@
package com.sdm.pbs.service.impl;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.pbs.model.bo.FileBaseInfo;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.service.IPbsService;
import com.sdm.pbs.service.IPbsServiceDecorator;
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,7 +34,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
@Override
public SdmResponse<HpcResouceInfo> queryHpcResource() {
return null;
return pbsService.queryHpcResource();
}
@Override
@@ -45,7 +45,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
@Override
public SdmResponse<Boolean> stopHpcJob(String jobId) {
return null;
return pbsService.stopHpcJob(jobId);
}
@Override