From 6d7c8c1bc73035aeb57c613fc317e544fc308e6f Mon Sep 17 00:00:00 2001 From: yangyang Date: Fri, 27 Mar 2026 16:07:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Aflowable=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E5=8F=91=E8=B5=B7=E4=BB=BB=E5=8A=A1=EF=BC=8C?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=8A=E8=A1=8C=E7=8A=B6=E6=80=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/sdm/pbs/config/PbsCommonConstant.java | 23 +++++ .../com/sdm/pbs/controller/TaskAdapter.java | 79 +++++++++++------- .../sdm/pbs/controller/TaskController.java | 5 +- .../sdm/pbs/model/entity/HpcFileStatus.java | 17 ++++ .../sdm/pbs/model/entity/HpcJobStatus.java | 19 +++++ .../com/sdm/pbs/model/req/WebSubmitReq.java | 2 +- .../java/com/sdm/pbs/service/IPbsService.java | 2 +- .../pbs/service/impl/IPbsHpcServiceImpl.java | 2 +- .../pbs/service/impl/PbsServiceDecorator.java | 83 ++++++++++++++----- 9 files changed, 173 insertions(+), 59 deletions(-) create mode 100644 pbs/src/main/java/com/sdm/pbs/config/PbsCommonConstant.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/entity/HpcFileStatus.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java diff --git a/pbs/src/main/java/com/sdm/pbs/config/PbsCommonConstant.java b/pbs/src/main/java/com/sdm/pbs/config/PbsCommonConstant.java new file mode 100644 index 00000000..601a9fec --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/config/PbsCommonConstant.java @@ -0,0 +1,23 @@ +package com.sdm.pbs.config; + +/** + * PBS HPC 通用常量类 + * 统一管理状态、枚举值、固定配置 + */ +public class PbsCommonConstant { + + private PbsCommonConstant() { + // 私有构造,禁止实例化 + } + + // ======================== 任务类型 type ======================== + public static final String TASK_TYPE_SINGLE = "single"; // 单独任务 + public static final String TASK_TYPE_BATCH = "batch"; // 批量任务 + public static final String TASK_TYPE_FLOWABLE = "flowable"; // 流程任务 + public static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE"; + public static final String EXECUTE_MODE_WEB_BATCH = "WEB_BATCH"; + + // ======================== 删除标识 delFlag ======================== + public static final String DEL_FLAG_NO = "N"; // 未删除 + public static final String DEL_FLAG_YES = "Y"; // 已删除 +} \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java index f87cdfcc..b152b9db 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java @@ -6,11 +6,13 @@ import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.utils.FilesUtil; +import com.sdm.pbs.config.PbsCommonConstant; import com.sdm.pbs.model.bo.BatchWebSubmitResp; import com.sdm.pbs.model.entity.SimulationJob; import com.sdm.pbs.model.req.BatchHpcTaskReq; import com.sdm.pbs.model.req.OneHpcTaskReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; +import com.sdm.pbs.model.req.WebSubmitReq; import com.sdm.pbs.service.ISimulationJobService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -37,8 +39,6 @@ public class TaskAdapter implements ITaskFeignClient { private static final String EXECUTE_MODE_AUTO = "AUTO"; private static final String EXECUTE_MODE_MANUAL = "MANUAL"; - private static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE"; - private static final String EXECUTE_MODE_WEB_BATCH = "WEB_BATCH"; private static final String FLOWABLE_SIMULATION_BASEDIR = "/home/simulation/"; @Autowired @@ -50,30 +50,17 @@ public class TaskAdapter implements ITaskFeignClient { @Value("${hpc.webSubmit.basePath:/home/simulation/hpc}") private String BASE_STORAGE_PATH ; - - @Value("${testEnStr:}") - private String enStr; - @Value("${testEnStr2:}") - private String testEnStr2; - - @Value("${pbs.task.impl}") - private String pbsImpl; - - @GetMapping("/testEn") - @Operation(summary = "作业提交") - public SdmResponse> testEn() { - Map map = new HashMap<>(); - map.put("enStr", enStr); - map.put("pbsImpl", pbsImpl); - map.put("testEnStr2", testEnStr2); - return SdmResponse.success(map); - } - - - @PostMapping("/adapterSubmitHpcJob") @Operation(summary = "作业提交") public SdmResponse adapterSubmitHpcJob(@RequestBody SubmitHpcTaskRemoteReq req) { + // 从flowable 过来的请求 + // 预插入数据 + WebSubmitReq webSubmitReq = buildWebSubmitReq(req,"flowable发起任务","flowable"); + SdmResponse preResp = taskController.preSingleHpcJobSubmit(webSubmitReq,false); + if(!preResp.isSuccess()) { + log.error("flowable作业提交失败,插入任务数据异常:{}",JSONObject.toJSONString(req)); + throw new RuntimeException("hpc作业提交失败,插入任务数据异常"); + } // spdm 回传路径 // 求解文件获取,可以在这一层分片上传,然后拿到对应的路径 SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); @@ -86,7 +73,7 @@ public class TaskAdapter implements ITaskFeignClient { @Operation(summary = "独立网页hpc作业提交") public SdmResponse webSubmit(@RequestBody OneHpcTaskReq req) { // 预插入数据 - SdmResponse preResp = taskController.preSingleHpcJobSubmit(req); + SdmResponse preResp = taskController.preSingleHpcJobSubmit(req,false); if(!preResp.isSuccess()) { log.error("hpc作业提交失败,插入任务数据异常:{}",JSONObject.toJSONString(req)); throw new RuntimeException("hpc作业提交失败,插入任务数据异常"); @@ -98,7 +85,7 @@ public class TaskAdapter implements ITaskFeignClient { getSimulationFile(remoteReq,submitHpcTaskReq,""); // 公共提交接口 BeanUtils.copyProperties(remoteReq,submitHpcTaskReq); - submitHpcTaskReq.setFrom(EXECUTE_MODE_WEB_SINGLE); + submitHpcTaskReq.setFrom(PbsCommonConstant.EXECUTE_MODE_WEB_SINGLE); Long userId = ThreadLocalContext.getUserId(); Long tenantId = ThreadLocalContext.getTenantId(); // 异步发起,失败后数据更改todo @@ -109,8 +96,8 @@ public class TaskAdapter implements ITaskFeignClient { @PostMapping("/batchWebSubmit") @Operation(summary = "批量网页hpc作业提交") public SdmResponse batchWebSubmit(@RequestBody BatchHpcTaskReq req) { - // 预插入数据 - SdmResponse preResp = taskController.preSingleHpcJobSubmit(req); + // 预插入总数据 + SdmResponse preResp = taskController.preSingleHpcJobSubmit(req,true); if(!preResp.isSuccess()) { log.error("batchWebSubmit作业提交失败,插入任务数据异常:{}",JSONObject.toJSONString(req)); throw new RuntimeException("hpc作业提交失败,插入任务数据异常"); @@ -174,7 +161,7 @@ public class TaskAdapter implements ITaskFeignClient { submitHpcTaskReq.setInputFilePaths(inputFilePaths); } // 网页单一任务 - if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_WEB_SINGLE)) { + if (Objects.equals(req.getExecuteMode(), PbsCommonConstant.EXECUTE_MODE_WEB_SINGLE)) { // 文件的基础路径,就是本次任务的总目录 // /hpc/{userId}/{jobName}/{uuid} Long userId = ThreadLocalContext.getUserId(); @@ -188,7 +175,7 @@ public class TaskAdapter implements ITaskFeignClient { req.setStdoutSpdmNasFilePath(fileBasePath); } // 网页批量任务 - if(Objects.equals(req.getExecuteMode(),EXECUTE_MODE_WEB_BATCH)){ + if(Objects.equals(req.getExecuteMode(),PbsCommonConstant.EXECUTE_MODE_WEB_BATCH)){ Pair> pair = FilesUtil.getSingleSubmitFiles(batchFilePath); // 本地主文件 submitHpcTaskReq.setMasterFilePath(pair.getLeft()); @@ -215,7 +202,7 @@ public class TaskAdapter implements ITaskFeignClient { remoteReq.setIndependence(jobDb.getIndependence()); remoteReq.setUuid(req.getUuid()); // 网页单一任务 - remoteReq.setExecuteMode(EXECUTE_MODE_WEB_SINGLE); + remoteReq.setExecuteMode(PbsCommonConstant.EXECUTE_MODE_WEB_SINGLE); return remoteReq; } @@ -237,7 +224,7 @@ public class TaskAdapter implements ITaskFeignClient { remoteReq.setSoftware(jobDb.getSoftware()); remoteReq.setJobType(jobDb.getJobType()); // 网页批量任务 - remoteReq.setExecuteMode(EXECUTE_MODE_WEB_BATCH); + remoteReq.setExecuteMode(PbsCommonConstant.EXECUTE_MODE_WEB_BATCH); return remoteReq; } @@ -335,10 +322,17 @@ public class TaskAdapter implements ITaskFeignClient { String lastDirName = FilesUtil.getLastDirectoryName(taskDirAllPath); // 构建远程请求参数 SubmitHpcTaskRemoteReq remoteReq = batchBuildCommonRemoteReq(req, jobDb, lastDirName); + // 批量任务的子任务预插入数据 + WebSubmitReq prewebSubmitReq = buildWebSubmitReq(remoteReq, remoteReq.getJobDesc(), remoteReq.getJobType()); + SdmResponse preResp = taskController.preSingleHpcJobSubmit(prewebSubmitReq,false); + if(!preResp.isSuccess()) { + log.error("批量子作业提交失败,插入任务数据异常:{}",JSONObject.toJSONString(req)); + throw new RuntimeException("批量子作业提交失败,插入任务数据异常"); + } // 构建任务提交请求 SubmitHpcTaskReq submitHpcTaskReq = buildSubmitTaskReq(remoteReq,taskDirAllPath); // 设置批量提交来源 - submitHpcTaskReq.setFrom(EXECUTE_MODE_WEB_BATCH); + submitHpcTaskReq.setFrom(PbsCommonConstant.EXECUTE_MODE_WEB_BATCH); // 调用提交接口 SdmResponse response = taskController.submitHpcJob(submitHpcTaskReq); // 处理成功结果 @@ -395,6 +389,27 @@ public class TaskAdapter implements ITaskFeignClient { log.info("doBatchSubmitHpcTasks back:{}",JSONObject.toJSONString(results)); } + /** + * 封装Flowable请求为WebSubmitReq对象 + * @param req 前端/Flowable传入的请求 + * @return 封装好的WebSubmitReq + */ + private WebSubmitReq buildWebSubmitReq(SubmitHpcTaskRemoteReq req,String jobDesc,String type) { + WebSubmitReq webSubmitReq = new WebSubmitReq(); + // 生成无横杠UUID + String uuid = UUID.randomUUID().toString().replace("-", ""); + req.setUuid(uuid); + // 赋值 + webSubmitReq.setUuid(uuid); + webSubmitReq.setSoftwareId(req.getSoftwareId()); + webSubmitReq.setSoftware(req.getSoftware()); + webSubmitReq.setJobName(req.getJobName()); + webSubmitReq.setCoreNum(req.getCoreNum()); + webSubmitReq.setJobDesc(jobDesc); + webSubmitReq.setHpcGroup("HPC_PACK"); + webSubmitReq.setType(type); + return webSubmitReq; + } } diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java index 776a7b1b..26d54f31 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java @@ -289,8 +289,9 @@ public class TaskController { return pbsService.downloadFile(req.getJobId(),req.getFileName()); } - public SdmResponse preSingleHpcJobSubmit(WebSubmitReq req) { - return pbsService.preSingleHpcJobSubmit(req); + // firstBatch 批量任务总数据是true,其他false + public SdmResponse preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch) { + return pbsService.preSingleHpcJobSubmit(req,firstBatch); } } diff --git a/pbs/src/main/java/com/sdm/pbs/model/entity/HpcFileStatus.java b/pbs/src/main/java/com/sdm/pbs/model/entity/HpcFileStatus.java new file mode 100644 index 00000000..ba5d445d --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/entity/HpcFileStatus.java @@ -0,0 +1,17 @@ +package com.sdm.pbs.model.entity; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum HpcFileStatus { + uplinking("文件上传中-上行"), + upsuccend("文件上行成功"), + upfailed("文件上行异常"), + generating("任务运行文件生成中"), + uploading("文件回传中-下行"), + finished("文件回传完成"), + failed("文件回传失败"); + + private final String desc; +} \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java b/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java new file mode 100644 index 00000000..680a45e3 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java @@ -0,0 +1,19 @@ +package com.sdm.pbs.model.entity; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum HpcJobStatus { + Preparing("准备"), + Configuring("配置中"), + Queued("队列中"), + // 表示:调度器已为任务分配好计算资源,正在与目标节点通信以启动任务执行 + Dispatching("调度中"), + Running("执行中"), + Canceled("已取消"), + Finished("已完成"), + Failed("任务失败"); + + private final String desc; +} \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java index d81b51bb..1f9d339a 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java +++ b/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java @@ -47,7 +47,7 @@ public class WebSubmitReq implements Serializable { private String hpcGroup; /** - * single :单独任务 or batch :批量任务 + * single :单独任务 or batch :批量任务,flowable 工作流发起的任务 */ private String type; diff --git a/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java b/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java index f34575a5..cd69a4f8 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java @@ -62,6 +62,6 @@ public interface IPbsService { SdmResponse stopStreamTaskLog(String clientToken); - SdmResponse preSingleHpcJobSubmit(WebSubmitReq req); + SdmResponse preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch); } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java index be53726f..08692658 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java @@ -148,7 +148,7 @@ public class IPbsHpcServiceImpl implements IPbsService { } @Override - public SdmResponse preSingleHpcJobSubmit(WebSubmitReq req) { + public SdmResponse preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch) { return SdmResponse.success(); } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index 0e241e94..44fadca7 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -25,10 +25,11 @@ import com.sdm.common.utils.PageUtils; import com.sdm.pbs.model.bo.CommandResult; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; -import com.sdm.pbs.model.entity.SimulationHpcCommand; -import com.sdm.pbs.model.entity.SimulationHpcCommandPlaceholder; -import com.sdm.pbs.model.entity.SimulationJob; -import com.sdm.pbs.model.req.*; +import com.sdm.pbs.model.entity.*; +import com.sdm.pbs.model.req.JobFileCallBackReq; +import com.sdm.pbs.model.req.QueryJobReq; +import com.sdm.pbs.model.req.SubmitHpcTaskReq; +import com.sdm.pbs.model.req.WebSubmitReq; import com.sdm.pbs.service.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -66,8 +67,6 @@ import java.util.stream.Collectors; @ConditionalOnProperty(name = "pbs.task.impl", havingValue = "hpc") public class PbsServiceDecorator implements IPbsServiceDecorator { - private static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE"; - @Autowired private HpcCommandExcuteUtil hpcCommandExcuteUtil; @@ -156,7 +155,13 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { String subDir = generateHpcSubDir(req); // 0.处理hpc文件 String masterFilePath = handleHpcFileUpload(req, subDir); - // 重新赋值,用于command拼接 + // 更新上行文件状态 + String fileStatus=StringUtils.isNotBlank(masterFilePath)? HpcFileStatus.upsuccend.name():HpcFileStatus.upfailed.name(); + updateFileStatusByUuid(req.getUuid(),fileStatus); + if(StringUtils.isBlank(masterFilePath)){ + throw new RuntimeException("hpc文件上行失败"); + } + // 重新赋值,用于command拼接 req.setMasterFilePath(masterFilePath); // 任务输出的文件夹 Pair pair = extractDirectoryAndFileName(masterFilePath); @@ -347,7 +352,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setTotalElapsedTime(null); // 标识及状态 // simulationJob.setUuid(null); - simulationJob.setFileStatus("generating"); + simulationJob.setFileStatus(HpcFileStatus.generating.name()); // 审计字段 Long userId = ThreadLocalContext.getUserId(); Long tenantId = ThreadLocalContext.getTenantId(); @@ -369,15 +374,18 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { if(StringUtils.isNotBlank(req.getJobDesc())){ simulationJob.setJobDesc(req.getJobDesc()); } - - // 独立单一任务 - if(Objects.equals(req.getFrom(),EXECUTE_MODE_WEB_SINGLE)){ - SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one(); - simulationJob.setId(jobDb.getId()); - simulationJobService.updateById(simulationJob); - }else { - simulationJobService.save(simulationJob); - } + SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one(); + simulationJob.setId(jobDb.getId()); + boolean b = simulationJobService.updateById(simulationJob); + log.info("hpc submit end,update :{},jobId:{}",b,jobId); +// // 批量的任务 +// if(Objects.equals(req.getFrom(), PbsCommonConstant.EXECUTE_MODE_WEB_BATCH)){ +// simulationJobService.save(simulationJob); +// }else { +// SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one(); +// simulationJob.setId(jobDb.getId()); +// simulationJobService.updateById(simulationJob); +// } } } @@ -622,7 +630,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { } @Override - public SdmResponse preSingleHpcJobSubmit(WebSubmitReq req) { + public SdmResponse preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch) { // 预提交数据入库 SimulationJob simulationJob = new SimulationJob(); // 基础字段 @@ -633,16 +641,18 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { // job 的类型 simulationJob.setJobType(req.getHpcGroup()); // 0:flow 任务 1:独立提交单一任务,2独立提交批量任务 - int independence = Objects.equals(req.getType(),"single") ? 1:2; + int independence = getIndependenceByType(req.getType()); simulationJob.setIndependence(independence); // 软件及文件关联 simulationJob.setSoftwareId(req.getSoftwareId()); // 任务装备状态 - simulationJob.setJobStatus("Preparing"); + simulationJob.setJobStatus(HpcJobStatus.Preparing.name()); // 求解器名称 simulationJob.setSolverName(req.getSoftware()); - // 文件上行处理中 - simulationJob.setFileStatus("uplinking"); + // 文件上行处理中,批量的总数据,不存文件状态 + if(!firstBatch){ + simulationJob.setFileStatus(HpcFileStatus.uplinking.name()); + } // 文件详情 simulationJob.setJobDesc(req.getJobDesc()); simulationJob.setUuid(req.getUuid()); @@ -1080,5 +1090,34 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { } return resultMap; } + /** + * 根据任务类型获取独立性标识 + * @param type 任务类型 single/batch/flowable + * @return independence 1/2/0 + */ + private int getIndependenceByType(String type) { + if (Objects.equals(type, "single")) { + return 1; + } + if (Objects.equals(type, "batch")) { + return 2; + } + // flowable 或其他未知类型,默认返回 0 + return 0; + } + + /** + * 根据UUID更新文件状态 + * @param uuid 唯一标识 + * @param fileStatus 状态值:generating、uploading、finished、failed + */ + private boolean updateFileStatusByUuid(String uuid, String fileStatus) { + boolean update = simulationJobService.lambdaUpdate() + .eq(SimulationJob::getUuid, uuid) + .set(SimulationJob::getFileStatus, fileStatus) + .update(); + log.info("submit hpc file up end:{}", update); + return update; + } }