From eb269476904524d66f58ee9573678910b86d0b79 Mon Sep 17 00:00:00 2001 From: yangyang Date: Wed, 25 Mar 2026 18:11:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Ahpc=20pack=20?= =?UTF-8?q?=E7=8B=AC=E7=AB=8B=E9=A1=B5=E9=9D=A2=EF=BC=8C=E6=8F=90=E4=BA=A4?= =?UTF-8?q?=E5=8D=95=E4=B8=80=E4=BB=BB=E5=8A=A1=EF=BC=8C=E6=8F=90=E4=BA=A4?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E4=BB=BB=E5=8A=A1=E6=8E=A5=E5=8F=A3=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=EF=BC=9Bpbs=E6=B1=82=E8=A7=A3=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=8F=8A=E6=96=87=E4=BB=B6=E5=A4=B9=E4=B8=8A=E4=BC=A0=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=8F=90=E4=BA=A4=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1-sql/2026-03-26/yang.sql | 9 + .../req/pbs/SubmitHpcTaskRemoteReq.java | 9 + .../common/utils/HpcCommandExcuteUtil.java | 3 +- .../sdm/pbs/controller/PbsFileController.java | 112 ++++++++ .../com/sdm/pbs/controller/TaskAdapter.java | 255 +++++++++++++++++- .../sdm/pbs/controller/TaskController.java | 7 +- .../sdm/pbs/model/bo/BatchWebSubmitResp.java | 12 + .../model/bo/PbsChunkUploadLocalFileResp.java | 11 + .../sdm/pbs/model/entity/SimulationJob.java | 13 +- .../sdm/pbs/model/req/BatchHpcTaskReq.java | 13 + .../com/sdm/pbs/model/req/OneHpcTaskReq.java | 8 + .../model/req/PbsChunkUploadLocalFileReq.java | 26 ++ .../pbs/model/req/SingleHpcJobSubmitReq.java | 60 +++++ .../sdm/pbs/model/req/SubmitHpcTaskReq.java | 16 +- .../com/sdm/pbs/model/req/WebSubmitReq.java | 22 ++ .../hpc/hander/FinishedStatusHandler.java | 22 +- .../pbs/service/HpcInstructionService.java | 2 +- .../pbs/service/IPbsFileUploadService.java | 9 + .../java/com/sdm/pbs/service/IPbsService.java | 3 + .../impl/HpcInstructionServiceImpl.java | 4 +- .../pbs/service/impl/IPbsHpcServiceImpl.java | 6 + .../impl/PbsFileUploadServiceImpl.java | 135 ++++++++++ .../pbs/service/impl/PbsServiceDecorator.java | 64 ++++- 23 files changed, 803 insertions(+), 18 deletions(-) create mode 100644 1-sql/2026-03-26/yang.sql create mode 100644 pbs/src/main/java/com/sdm/pbs/controller/PbsFileController.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/bo/BatchWebSubmitResp.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/bo/PbsChunkUploadLocalFileResp.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/req/BatchHpcTaskReq.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/req/OneHpcTaskReq.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/req/PbsChunkUploadLocalFileReq.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/req/SingleHpcJobSubmitReq.java create mode 100644 pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java create mode 100644 pbs/src/main/java/com/sdm/pbs/service/IPbsFileUploadService.java create mode 100644 pbs/src/main/java/com/sdm/pbs/service/impl/PbsFileUploadServiceImpl.java diff --git a/1-sql/2026-03-26/yang.sql b/1-sql/2026-03-26/yang.sql new file mode 100644 index 00000000..636aec74 --- /dev/null +++ b/1-sql/2026-03-26/yang.sql @@ -0,0 +1,9 @@ +--新增字段 +ALTER TABLE simulation_job + ADD COLUMN jobDesc VARCHAR(512) DEFAULT NULL COMMENT '任务描述' AFTER coreNum; +-- 修改字段描述 +ALTER TABLE simulation_job + MODIFY COLUMN `independence` tinyint(1) NOT NULL DEFAULT '0' COMMENT '计算任务是否独立存在 0:flow 任务 1:独立提交单一任务,2独立提交批量任务'; +-- 新增字段 +ALTER TABLE `simulation_job` + ADD COLUMN `groupId` bigint NULL COMMENT '批量任务组id'; \ No newline at end of file diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java index 4f7e427c..b76fc6af 100644 --- a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java @@ -98,4 +98,13 @@ public class SubmitHpcTaskRemoteReq implements Serializable { @Schema(description = "流程实例id") private String processInstanceId; + @Schema(description = "本次任务凭证") + private String uuid; + + @Schema(description = "批量任务组id") + private Long groupId; + + @Schema(description = "任务描述") + private String jobDesc; + } diff --git a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java index 1632556b..0252cc9c 100644 --- a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java +++ b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java @@ -206,7 +206,7 @@ public class HpcCommandExcuteUtil { public SdmResponse callHpcUploadToTarget(String jobId, String workDir,String minioBucket, String callBackMinioDir,String callBackNasDir, - Long dirId,Long userId,Long tenantId,String outputFormat) { + Long dirId,Long userId,Long tenantId,String outputFormat,String from) { com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject(); paramJson.put("jobId", jobId); paramJson.put("jobWorkDir", workDir); @@ -217,6 +217,7 @@ public class HpcCommandExcuteUtil { paramJson.put("userId", userId); paramJson.put("tenantId", tenantId); paramJson.put("outputFormat", outputFormat); + paramJson.put("from", from); Boolean call = false; String resultString = ""; try { diff --git a/pbs/src/main/java/com/sdm/pbs/controller/PbsFileController.java b/pbs/src/main/java/com/sdm/pbs/controller/PbsFileController.java new file mode 100644 index 00000000..3850eadd --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/controller/PbsFileController.java @@ -0,0 +1,112 @@ +package com.sdm.pbs.controller; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.utils.FilesUtil; +import com.sdm.pbs.model.bo.PbsChunkUploadLocalFileResp; +import com.sdm.pbs.model.req.PbsChunkUploadLocalFileReq; +import com.sdm.pbs.service.IPbsFileUploadService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.servlet.http.HttpServletRequest; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.io.File; +import java.io.IOException; + +@Slf4j +@RestController +@RequestMapping("/pbs") +@Tag(name = "pbs文件处理", description = "pbs文件处理") +public class PbsFileController { + + /** + * 根存储路径(可配置到application.yml) + */ + @Value("${hpc.webSubmit.basePath:/home/simulation/hpc}") + private String BASE_STORAGE_PATH ; + + @Autowired + private IPbsFileUploadService fileUploadService; + + /** + * 文件分片上传接口 + */ + @PostMapping("/chunkUploadToLocal") + @Operation(summary = "本地文件上传") + public SdmResponse chunkUpload(PbsChunkUploadLocalFileReq req) { + // 1. 基础参数校验 + Long userId = ThreadLocalContext.getUserId(); + if (userId== null || req.getJobName() == null || req.getUuid() == null || + req.getSourceFileName() == null || req.getChunk() == null || req.getChunkTotal() == null || req.getFile() == null) { + return SdmResponse.failed("必填参数不能为空"); + } + Pair pair = buildBasePath(req, userId); + // 3. 处理分片上传 basePath,basePathParent + SdmResponse resp = fileUploadService.handleChunkUpload(req, pair.getLeft(),pair.getRight()); + return resp; + } + + + /** + * 节点文件分片上传(自动合并版) + * 接收分片 -> 保存分片 -> 判断是否为最后一个分片 -> 是则自动合并并清理分片 + * @param fileId 文件唯一标识 + * @param chunkIndex 当前分片序号(从0开始) + * @param totalChunks 总分片数 + * @param absoluteFilePath 文件存储绝对路径(不带文件名) + * @param filename 最终文件名 + * @param request 请求对象 + * @return 上传结果(成功/合并成功) + * @throws IOException IO异常 + */ + @PostMapping("/hpcCallbackUploadLocalChunkAutoMerge") + public SdmResponse uploadChunkAndAutoMerge( + @RequestParam String fileId,@RequestParam int chunkIndex, + @RequestParam int totalChunks,@RequestParam String absoluteFilePath, + @RequestParam String filename, HttpServletRequest request) { + String result = ""; + try { + result = FilesUtil.handleChunkUploadLocalAndAutoMerge( + absoluteFilePath, fileId, chunkIndex,totalChunks,filename, + request.getInputStream() ); + } catch (Exception e) { + log.error("hpcCallbackUploadLocalChunkAutoMerge error:{}",e.getMessage()); + result = "上传异常"; + return SdmResponse.failed(result); + } + return SdmResponse.success(result); + } + + + + /** + * 构建基础存储路径(严格遵循你的规则) basePath:当前上传文件的文件夹位置;basePathParent:用户选择的文件夹 + */ + private Pair buildBasePath(PbsChunkUploadLocalFileReq req, Long userId) { + StringBuilder basePath = new StringBuilder(BASE_STORAGE_PATH); + // /hpc/{userId}/{taskName}/{timestamp} + basePath.append(File.separator).append(userId) + .append(File.separator).append(req.getJobName()) + .append(File.separator).append(req.getUuid()); + // 上传整个文件夹的起始目录,用于返回前端 + String basePathParent=""; + if(StringUtils.isNotBlank(req.getSelectDirName())){ + basePathParent= basePath.toString()+File.separator+req.getSelectDirName().replace("/", File.separator); + } + // 文件夹上传:追加selectFilePath + if (req.getSelectFilePath() != null && !req.getSelectFilePath().trim().isEmpty()) { + basePath.append(File.separator).append(req.getSelectFilePath().replace("/", File.separator)); + } + return Pair.of(basePath.toString(), basePathParent); + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java index 5039c5d5..ed904837 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java @@ -2,20 +2,29 @@ package com.sdm.pbs.controller; import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; +import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.utils.FilesUtil; +import com.sdm.pbs.model.bo.BatchWebSubmitResp; +import com.sdm.pbs.model.entity.SimulationJob; +import com.sdm.pbs.model.req.BatchHpcTaskReq; +import com.sdm.pbs.model.req.OneHpcTaskReq; +import com.sdm.pbs.model.req.SingleHpcJobSubmitReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; +import com.sdm.pbs.service.ISimulationJobService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; +import java.io.File; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.atomic.AtomicReference; @@ -28,11 +37,19 @@ public class TaskAdapter implements ITaskFeignClient { private static final String EXECUTE_MODE_AUTO = "AUTO"; private static final String EXECUTE_MODE_MANUAL = "MANUAL"; + private static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE"; + private static final String EXECUTE_MODE_WEB_BATCH = "WEB_BATCH"; private static final String FLOWABLE_SIMULATION_BASEDIR = "/home/simulation/"; @Autowired private TaskController taskController; + @Autowired + private ISimulationJobService simulationJobService; + + @Value("${hpc.webSubmit.basePath:/home/simulation/hpc}") + private String BASE_STORAGE_PATH ; + @Value("${testEnStr:}") private String enStr; @@ -62,12 +79,50 @@ public class TaskAdapter implements ITaskFeignClient { // spdm 回传路径 // 求解文件获取,可以在这一层分片上传,然后拿到对应的路径 SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); - getSimulationFile(req,submitHpcTaskReq); + getSimulationFile(req,submitHpcTaskReq,""); BeanUtils.copyProperties(req,submitHpcTaskReq); return taskController.submitHpcJob(submitHpcTaskReq); } - private void getSimulationFile(SubmitHpcTaskRemoteReq req,SubmitHpcTaskReq submitHpcTaskReq ){ + @PostMapping("/preSingleHpcJobSubmit") + @Operation(summary = "单独一个作业预提交") + public SdmResponse preSingleHpcJobSubmit(@RequestBody SingleHpcJobSubmitReq req) { + return taskController.preSingleHpcJobSubmit(req); + } + + @PostMapping("/webSubmit") + @Operation(summary = "独立网页hpc作业提交") + public SdmResponse webSubmit(@RequestBody OneHpcTaskReq req) { + SubmitHpcTaskRemoteReq remoteReq = buildCommonRemoteReq(req); + // 获取对应主/从文件 + SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); + getSimulationFile(remoteReq,submitHpcTaskReq,""); + // 公共提交接口 + BeanUtils.copyProperties(remoteReq,submitHpcTaskReq); + submitHpcTaskReq.setFrom(EXECUTE_MODE_WEB_SINGLE); + return taskController.submitHpcJob(submitHpcTaskReq); + } + + + @PostMapping("/batchWebSubmit") + @Operation(summary = "批量网页hpc作业提交") + public SdmResponse> batchWebSubmit(@RequestBody BatchHpcTaskReq req) { + // 校验并获取基础作业信息 + SimulationJob jobDb = getAndValidateSimulationJob(req.getUuid()); + // 生成文件基础路径 + String fileBasePath = generateTaskBasePath(req, jobDb); + // 获取所有子任务目录 + List allTaskDirs = getAndValidateTaskDirs(fileBasePath); + // 批量提交HPC任务 + List results = batchSubmitHpcTasks(req, jobDb, allTaskDirs); + // 清理预置数据 + deleteSimulationJob(jobDb); + // 返回结果 + return SdmResponse.success(results); + } + + + private void getSimulationFile(SubmitHpcTaskRemoteReq req,SubmitHpcTaskReq submitHpcTaskReq,String batchFilePath ) { log.info("提交请求参数:{}", JSONObject.toJSONString(req)); String masterFilepath=""; List inputFilePaths=new ArrayList<>(); @@ -110,6 +165,202 @@ public class TaskAdapter implements ITaskFeignClient { submitHpcTaskReq.setMasterFilePath(masterFilepath); submitHpcTaskReq.setInputFilePaths(inputFilePaths); } + // 网页单一任务 + if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_WEB_SINGLE)) { + // 文件的基础路径,就是本次任务的总目录 + // /hpc/{userId}/{jobName}/{uuid} + Long userId = ThreadLocalContext.getUserId(); + String fileBasePath = generateTaskBasePath(userId, req.getJobName(), req.getUuid(), "", false); + Pair> pair = FilesUtil.getSingleSubmitFiles(fileBasePath); + // 本地主文件 + submitHpcTaskReq.setMasterFilePath(pair.getLeft()); + // 本地从文件 + submitHpcTaskReq.setInputFilePaths(pair.getRight()); + // 任务回传会用到 + req.setStdoutSpdmNasFilePath(fileBasePath); + } + // 网页批量任务 + if(Objects.equals(req.getExecuteMode(),EXECUTE_MODE_WEB_BATCH)){ + Pair> pair = FilesUtil.getSingleSubmitFiles(batchFilePath); + // 本地主文件 + submitHpcTaskReq.setMasterFilePath(pair.getLeft()); + // 本地从文件 + submitHpcTaskReq.setInputFilePaths(pair.getRight()); + // 任务回传会用到 + req.setStdoutSpdmNasFilePath(batchFilePath); + } + } + /** + * 抽取了查询任务、赋值基础属性、设置执行模式等通用逻辑 + */ + private SubmitHpcTaskRemoteReq buildCommonRemoteReq(OneHpcTaskReq req) { + // 预制入库的数据 + SimulationJob jobDb = getAndValidateSimulationJob(req.getUuid()); + SubmitHpcTaskRemoteReq remoteReq = new SubmitHpcTaskRemoteReq(); + // 基础任务信息 + remoteReq.setJobName(jobDb.getJobName()); + remoteReq.setCoreNum(jobDb.getCoreNum()); + remoteReq.setSoftwareId(jobDb.getSoftwareId()); + // 0=独立任务 1=非独立任务 + remoteReq.setIndependence(jobDb.getIndependence()); + remoteReq.setUuid(req.getUuid()); + // 网页单一任务 + remoteReq.setExecuteMode(EXECUTE_MODE_WEB_SINGLE); + return remoteReq; + } + + /** + * 抽取了查询任务、赋值基础属性、设置执行模式等通用逻辑 + */ + private SubmitHpcTaskRemoteReq batchBuildCommonRemoteReq(BatchHpcTaskReq req,SimulationJob jobDb,String lastDirName) { + SubmitHpcTaskRemoteReq remoteReq = new SubmitHpcTaskRemoteReq(); + // 基础任务信息 + remoteReq.setJobName(jobDb.getJobName()+"_"+lastDirName); + remoteReq.setCoreNum(jobDb.getCoreNum()); + remoteReq.setSoftwareId(jobDb.getSoftwareId()); + // 0=独立任务 1=非独立任务 + remoteReq.setIndependence(jobDb.getIndependence()); + remoteReq.setUuid(req.getUuid()); + // 任务组id + remoteReq.setGroupId(jobDb.getId()); + remoteReq.setJobDesc(jobDb.getJobDesc()); + remoteReq.setSoftware(jobDb.getSoftware()); + remoteReq.setJobType(jobDb.getJobType()); + // 网页批量任务 + remoteReq.setExecuteMode(EXECUTE_MODE_WEB_BATCH); + return remoteReq; + } + + + /** + * 抽取公共方法:生成任务的文件基础存储路径 + * 路径格式:BASE_STORAGE_PATH/{userId}/{jobName}/{uuid} + * + * @param userId 用户ID + * @param jobName 任务名称 + * @param uuid 任务唯一标识 + * @return 拼接完成的基础路径字符串 + */ + private String generateTaskBasePath(Long userId, String jobName, String uuid,String selectDirName,boolean batch) { + // 非空校验 + if (userId == null) { + throw new RuntimeException("提交任务用户id非法"); + } + if (StringUtils.isBlank(jobName) || StringUtils.isBlank(uuid)) { + throw new RuntimeException("任务名称或唯一标识不能为空"); + } + // 拼接路径 + StringBuilder basePath = new StringBuilder(BASE_STORAGE_PATH); + basePath.append(File.separator).append(userId) + .append(File.separator).append(jobName) + .append(File.separator).append(uuid); + if (batch) { + if (StringUtils.isBlank(selectDirName)) { + throw new RuntimeException("批量提交任务时,所选文件夹名称不能为空"); + } + // 路径安全校验:禁止包含路径分隔符,防止路径穿越 + if (selectDirName.contains(File.separator) || selectDirName.contains("/") || selectDirName.contains("\\")) { + throw new RuntimeException("文件夹名称不能包含路径分隔符 / \\ 等非法字符"); + } + basePath.append(File.separator).append(selectDirName); + } + return basePath.toString(); + } + + /** + * 1. 根据UUID查询并校验SimulationJob + */ + private SimulationJob getAndValidateSimulationJob(String uuid) { + SimulationJob jobDb = simulationJobService.lambdaQuery() + .eq(SimulationJob::getUuid, uuid) + .one(); + if (jobDb == null) { + throw new RuntimeException("未查询到对应的作业信息,UUID:" + uuid); + } + return jobDb; + } + + /** + * 2. 生成任务基础路径 + */ + private String generateTaskBasePath(BatchHpcTaskReq req, SimulationJob jobDb) { + Long userId = ThreadLocalContext.getUserId(); + return generateTaskBasePath( userId, jobDb.getJobName(),req.getUuid(),req.getSelectDirName(),true ); + } + + /** + * 3. 获取并校验任务目录(非空校验) + */ + private List getAndValidateTaskDirs(String fileBasePath) { + List allTaskDirs = FilesUtil.getAllSubFolderAbsolutePaths(fileBasePath); + if (CollectionUtils.isEmpty(allTaskDirs)) { + throw new RuntimeException("批量任务目录文件为空,路径:" + fileBasePath); + } + return allTaskDirs; + } + + /** + * 4. 批量提交HPC任务(核心业务逻辑) + */ + private List batchSubmitHpcTasks(BatchHpcTaskReq req, SimulationJob jobDb, List allTaskDirs) { + List results = new ArrayList<>(); + for (String taskDirAllPath : allTaskDirs) { + BatchWebSubmitResp resp = submitSingleHpcTask(req, jobDb, taskDirAllPath); + if (resp != null) { + results.add(resp); + } + } + return results; + } + + /** + * 4.1 提交单个HPC任务(最小执行单元) + */ + private BatchWebSubmitResp submitSingleHpcTask(BatchHpcTaskReq req,SimulationJob jobDb, String taskDirAllPath) { + String lastDirName = FilesUtil.getLastDirectoryName(taskDirAllPath); + // 构建远程请求参数 + SubmitHpcTaskRemoteReq remoteReq = batchBuildCommonRemoteReq(req, jobDb, lastDirName); + // 构建任务提交请求 + SubmitHpcTaskReq submitHpcTaskReq = buildSubmitTaskReq(remoteReq,taskDirAllPath); + // 设置批量提交来源 + submitHpcTaskReq.setFrom(EXECUTE_MODE_WEB_BATCH); + // 调用提交接口 + SdmResponse response = taskController.submitHpcJob(submitHpcTaskReq); + // 处理成功结果 + if (response.isSuccess() && StringUtils.isNotBlank(response.getData())) { + BatchWebSubmitResp resp = new BatchWebSubmitResp(); + resp.setJobId(response.getData()); + resp.setSubmitDirName(lastDirName); + return resp; + } + log.warn("batch submit failed,response:{},submitHpcTaskReq:{}", JSONObject.toJSONString(response),JSONObject.toJSONString(submitHpcTaskReq)); + return null; + } + + /** + * 4.2 构建SubmitHpcTaskReq(封装文件获取+属性拷贝) + */ + private SubmitHpcTaskReq buildSubmitTaskReq(SubmitHpcTaskRemoteReq remoteReq,String taskDirAllPath) { + SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); + // 获取主/从文件 + getSimulationFile(remoteReq, submitHpcTaskReq,taskDirAllPath); + // 属性拷贝 + BeanUtils.copyProperties(remoteReq, submitHpcTaskReq); + return submitHpcTaskReq; + } + + /** + * 5. 删除作业数据 + */ + private void deleteSimulationJob(SimulationJob jobDb) { + if (jobDb.getId() == null) { + throw new RuntimeException("作业ID为空,无法删除"); + } + simulationJobService.removeById(jobDb.getId()); + } + + + } diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java index 246eb70f..e1882d91 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java @@ -18,6 +18,7 @@ import com.sdm.pbs.model.entity.SimulationHpcCommand; import com.sdm.pbs.model.entity.SimulationJob; import com.sdm.pbs.model.req.JobFileCallBackReq; import com.sdm.pbs.model.req.QueryJobReq; +import com.sdm.pbs.model.req.SingleHpcJobSubmitReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import com.sdm.pbs.service.HpcInstructionService; import com.sdm.pbs.service.IPbsService; @@ -279,7 +280,7 @@ public class TaskController { return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString(), paramMap.get("minioBucket").toString(), paramMap.get("callBackMinioDir").toString() ,paramMap.get("callBackNasDir").toString(), Long.valueOf(paramMap.get("dirId").toString()), - Long.valueOf(paramMap.get("userId").toString()),Long.valueOf(paramMap.get("tenantId").toString()),paramMap.get("outputFormat").toString()); + Long.valueOf(paramMap.get("userId").toString()),Long.valueOf(paramMap.get("tenantId").toString()),paramMap.get("outputFormat").toString(),"flow"); } @GetMapping("/mockHpcDownloadFile") @@ -291,4 +292,8 @@ public class TaskController { return pbsService.downloadFile(req.getJobId(),req.getFileName()); } + public SdmResponse preSingleHpcJobSubmit(SingleHpcJobSubmitReq req) { + return pbsService.preSingleHpcJobSubmit(req); + } + } diff --git a/pbs/src/main/java/com/sdm/pbs/model/bo/BatchWebSubmitResp.java b/pbs/src/main/java/com/sdm/pbs/model/bo/BatchWebSubmitResp.java new file mode 100644 index 00000000..df60198a --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/bo/BatchWebSubmitResp.java @@ -0,0 +1,12 @@ +package com.sdm.pbs.model.bo; + +import lombok.Data; + +@Data +public class BatchWebSubmitResp { + + private String jobId; + + private String submitDirName; + +} diff --git a/pbs/src/main/java/com/sdm/pbs/model/bo/PbsChunkUploadLocalFileResp.java b/pbs/src/main/java/com/sdm/pbs/model/bo/PbsChunkUploadLocalFileResp.java new file mode 100644 index 00000000..7d14e812 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/bo/PbsChunkUploadLocalFileResp.java @@ -0,0 +1,11 @@ +package com.sdm.pbs.model.bo; + +import lombok.Data; + +@Data +public class PbsChunkUploadLocalFileResp { + + // 上传的最终路径,如果是文件则是服务器文件路径,如果是文件夹则是文件夹的最终路径--用于发起任务的参数 + private String finalFullFilePath; + +} diff --git a/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java b/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java index 011eeb3f..779436b5 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java +++ b/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java @@ -47,9 +47,9 @@ public class SimulationJob implements Serializable { @TableField("jobType") private String jobType; - @Schema(description = "计算任务是否独立存在 0:非独立任务 1:独立任务") + @Schema(description = "计算任务是否独立存在 0:flow 任务 1:独立提交单一任务,2独立提交批量任务") @TableField("independence") - private Boolean independence; + private int independence; @Schema(description = "求解文件(JSON格式存储文件UUID列表)") @TableField("inputFiles") @@ -201,4 +201,13 @@ public class SimulationJob implements Serializable { @TableField(value = "costTime",exist = false) private String costTime; + @Schema(description = "任务描述") + @TableField(value = "jobDesc") + private String jobDesc; + + @Schema(description = "批量任务组id") + @TableField(value = "groupId") + private Long groupId; + + } \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/BatchHpcTaskReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/BatchHpcTaskReq.java new file mode 100644 index 00000000..89467d23 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/req/BatchHpcTaskReq.java @@ -0,0 +1,13 @@ +package com.sdm.pbs.model.req; + +import lombok.Data; + +@Data +public class BatchHpcTaskReq extends WebSubmitReq{ + + /** + * batch 模式上传文件夹的名字,用于后端寻找上传的文件夹位置 + */ + private String selectDirName; + +} diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/OneHpcTaskReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/OneHpcTaskReq.java new file mode 100644 index 00000000..fccbdbe1 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/req/OneHpcTaskReq.java @@ -0,0 +1,8 @@ +package com.sdm.pbs.model.req; + +import lombok.Data; + +@Data +public class OneHpcTaskReq extends WebSubmitReq{ + +} diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/PbsChunkUploadLocalFileReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/PbsChunkUploadLocalFileReq.java new file mode 100644 index 00000000..fa09671c --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/req/PbsChunkUploadLocalFileReq.java @@ -0,0 +1,26 @@ +package com.sdm.pbs.model.req; + +import lombok.Data; +import org.springframework.web.multipart.MultipartFile; + +@Data +public class PbsChunkUploadLocalFileReq { + // 任务的名称 + private String jobName; + // 前端生成时间戳,单次任务的请求相同,用于区分目录,及后端提交的凭证 + private String uuid; + // 用户选择文件夹的名字,单文件不传 + private String selectDirName; + // 文件夹的选择的路径必传,选择单一文件时,不传,如选择a文件夹,/a/b/ + private String selectFilePath; + // 原始文件名称 cc.txt + private String sourceFileName; + // 当前分片数 + private Integer chunk; + // 分片总数 + private Integer chunkTotal; + // 分片文件对象 + private MultipartFile file; + // 临时目录(第一片不传,后续必传) +// private String fileTempPath; +} \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/SingleHpcJobSubmitReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/SingleHpcJobSubmitReq.java new file mode 100644 index 00000000..3a3e96f7 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/req/SingleHpcJobSubmitReq.java @@ -0,0 +1,60 @@ +package com.sdm.pbs.model.req; + +import lombok.Data; +import java.io.Serializable; + +/** + * HPC单次计算任务提交请求实体类 + * 用于接收前端传递的单次作业提交参数 + * + * @author + * @date 2025-01-01 + */ +@Data +public class SingleHpcJobSubmitReq implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 前端生成时间戳 + * 单次任务的请求相同,用于区分目录,及后端提交的凭证 + * ① ② ③ 保持一致 + */ + private String uuid; + + /** + * 求解器配置的id + */ + private String softwareId; + + /** + * 求解器名称 + */ + private String software; + + /** + * 计算任务名称 + */ + private String jobName; + + /** + * 计算所需要核数 + */ + private int coreNum; + + /** + * 任务描述 512字符 + */ + private String jobDesc; + + /** + * hpc类型simulation_app_repository hpcGroup字段 HPC_PACK,OPEN_PBS + */ + private String hpcGroup; + + /** + * single :单独任务 or batch :批量任务 + */ + private String type; + +} \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java index a4c094a1..3ec39975 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java +++ b/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java @@ -24,8 +24,8 @@ public class SubmitHpcTaskReq { @Schema(description = "计算任务类型") public String jobType; - @Schema(description = "计算任务是否独立存在 false:非独立任务 true:独立任务") - public boolean independence; + @Schema(description = "计算任务是否独立存在 0:独立任务 1:非独立任务") + public int independence; @Schema(description = "求解文件") @JSONField(serialize = false) @@ -113,4 +113,16 @@ public class SubmitHpcTaskReq { @Schema(description = "流程实例id") private String processInstanceId; + @Schema(description = "任务提交来源") + public String from; + + @Schema(description = "本次任务凭证") + public String uuid; + + @Schema(description = "批量任务组id") + private Long groupId; + + @Schema(description = "任务描述") + private String jobDesc; + } diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java new file mode 100644 index 00000000..a1eae33a --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/model/req/WebSubmitReq.java @@ -0,0 +1,22 @@ +package com.sdm.pbs.model.req; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class WebSubmitReq implements Serializable { + + private static final long serialVersionUID = 2L; + + /** + * single or batch + */ + private String type; + + /** + * 前端生成时间戳,单次任务的请求相同,用于区分目录,及后端提交的凭证,① ② ③ 保持一致 + */ + private String uuid; + +} \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java index 4aa7088a..f7f3375c 100644 --- a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java @@ -96,9 +96,13 @@ public class FinishedStatusHandler implements JobStatusHandler { Long userId=newDbJob.getCreatorId(); Long tenantId = newDbJob.getTenantId(); String outputFormat = newDbJob.getOutputFormat(); + // 任务来源/提交渠道 flow:工作流发起的;single:独立web提交的;batch:独立批量提交的 + int independence = newDbJob.getIndependence(); + String from = getTaskFromByIndependence(independence); + log.info("callHpcUploadToTarget from:{}",from); // 通知工具回传文件 minio 或者 nas SdmResponse callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(),minioBucket, - newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath(),newDbJob.getDirId(),userId,tenantId,outputFormat); + newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath(),newDbJob.getDirId(),userId,tenantId,outputFormat,from); if (!callResponse.isSuccess()||!callResponse.getData()) { CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath()); return; @@ -130,4 +134,20 @@ public class FinishedStatusHandler implements JobStatusHandler { messageFeignClient.sendMessage(req); } + /** + * 根据任务独立性类型获取任务来源标识 + * @param independence 任务独立性类型 1=单一任务 2=批量任务 其他=流程任务 + * @return 任务来源 from: single/batch/flow + */ + private String getTaskFromByIndependence(Integer independence) { + if (independence == 1) { + return "single"; + } + if (independence == 2) { + return "batch"; + } + // 默认值 + return "flow"; + } + } diff --git a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java index 4b699ee9..c63eec2a 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java @@ -58,7 +58,7 @@ public interface HpcInstructionService { // 通知hpc回传文件 SdmResponse callHpcUploadToTarget(String jobId,String workDir,String minioBucket, String callBackMinioDir,String callBackNasDir,Long dirId, - Long userId,Long tenantId,String outputFormat); + Long userId,Long tenantId,String outputFormat,String from); void streamTaskLog(String filePath, String clientToken, OutputStream outputStream); diff --git a/pbs/src/main/java/com/sdm/pbs/service/IPbsFileUploadService.java b/pbs/src/main/java/com/sdm/pbs/service/IPbsFileUploadService.java new file mode 100644 index 00000000..524f3cda --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/service/IPbsFileUploadService.java @@ -0,0 +1,9 @@ +package com.sdm.pbs.service; + +import com.sdm.common.common.SdmResponse; +import com.sdm.pbs.model.bo.PbsChunkUploadLocalFileResp; +import com.sdm.pbs.model.req.PbsChunkUploadLocalFileReq; + +public interface IPbsFileUploadService { + SdmResponse handleChunkUpload(PbsChunkUploadLocalFileReq req, String basePath,String basePathParent); +} diff --git a/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java b/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java index 00942948..eb9abac8 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java @@ -4,6 +4,7 @@ import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; +import com.sdm.pbs.model.req.SingleHpcJobSubmitReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import org.springframework.http.ResponseEntity; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; @@ -60,4 +61,6 @@ public interface IPbsService { SdmResponse stopStreamTaskLog(String clientToken); + SdmResponse preSingleHpcJobSubmit(SingleHpcJobSubmitReq req); + } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java index e6230f04..f54ad498 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java @@ -497,9 +497,9 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { @Override public SdmResponse callHpcUploadToTarget(String jobId,String workDir,String minioBucket, String callBackMinioDir,String callBackNasDir,Long dirId, - Long userId,Long tenantId,String outputFormat) { + Long userId,Long tenantId,String outputFormat,String from) { return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir,minioBucket,callBackMinioDir, - callBackNasDir,dirId,userId,tenantId,outputFormat); + callBackNasDir,dirId,userId,tenantId,outputFormat, from); } @Override diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java index 7d3a1bb8..a304d37a 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java @@ -15,6 +15,7 @@ import com.sdm.common.log.CoreLogger; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcNodeInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; +import com.sdm.pbs.model.req.SingleHpcJobSubmitReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import com.sdm.pbs.service.HpcInstructionService; import com.sdm.pbs.service.IPbsService; @@ -145,6 +146,11 @@ public class IPbsHpcServiceImpl implements IPbsService { return hpcInstructionService.stopStreamTaskLog(clientToken); } + @Override + public SdmResponse preSingleHpcJobSubmit(SingleHpcJobSubmitReq req) { + return SdmResponse.success("HPC任务信息预入库成功"); + } + private HpcResouceInfo buildHpcResourceInfo(List nodes) { HpcResouceInfo result = new HpcResouceInfo(); diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsFileUploadServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsFileUploadServiceImpl.java new file mode 100644 index 00000000..f537554f --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsFileUploadServiceImpl.java @@ -0,0 +1,135 @@ +package com.sdm.pbs.service.impl; + +import com.sdm.common.common.SdmResponse; +import com.sdm.pbs.model.bo.PbsChunkUploadLocalFileResp; +import com.sdm.pbs.model.req.PbsChunkUploadLocalFileReq; +import com.sdm.pbs.service.IPbsFileUploadService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import java.io.*; +import java.nio.channels.FileChannel; +import java.nio.file.Files; + +@Slf4j +@Service +public class PbsFileUploadServiceImpl implements IPbsFileUploadService { + + @Override + public SdmResponse handleChunkUpload(PbsChunkUploadLocalFileReq req, String basePath,String basePathParent) { + try { + PbsChunkUploadLocalFileResp pbsChunkUploadLocalFileResp = dealFile(req, basePath,basePathParent); + return SdmResponse.success(pbsChunkUploadLocalFileResp); + } catch (Exception e) { + log.error("pbs handleChunkUpload error:{}",e.getMessage()); + } + return SdmResponse.failed("处理上传文件失败"); + } + + private PbsChunkUploadLocalFileResp dealFile(PbsChunkUploadLocalFileReq req, String basePath,String basePathParent) throws Exception { + PbsChunkUploadLocalFileResp resp = new PbsChunkUploadLocalFileResp(); + // 1. 创建基础目录 + File baseDir = new File(basePath); + if (!baseDir.exists()) { + baseDir.mkdirs(); + } + // 2. 生成临时分片目录(去掉文件后缀) + // 2. 生成临时目录名(兼容无后缀文件) + String sourceFileName = req.getSourceFileName(); + String tempDirName; + int lastDotIndex = sourceFileName.lastIndexOf("."); + if (lastDotIndex > 0) { + tempDirName = sourceFileName.substring(0, lastDotIndex); + } else { + tempDirName = sourceFileName; // 无后缀文件直接用文件名 + } + String tempDirPath = basePath + File.separator + tempDirName; + File tempDir = new File(tempDirPath); + +// // 3. 非首片上传:使用前端传入的临时目录 +// if (req.getFileTempPath() != null && !req.getFileTempPath().trim().isEmpty()) { +// tempDirPath = req.getFileTempPath(); +// tempDir = new File(tempDirPath); +// } + + // 4. 创建临时分片目录 + if (!tempDir.exists()) { + tempDir.mkdirs(); + } + // 5. 写入当前分片文件 + File chunkFile = new File(tempDir, req.getSourceFileName() + "." + req.getChunk()); + try (InputStream in = req.getFile().getInputStream(); + OutputStream out = new FileOutputStream(chunkFile)) { + byte[] buffer = new byte[1024 * 1024]; + int len; + while ((len = in.read(buffer)) != -1) { + out.write(buffer, 0, len); + } + } + log.info("分片{}写入完成:{}", req.getChunk(), chunkFile.getAbsolutePath()); + + // 6. 判断是否为最后一片,执行合并 + if (req.getChunk().equals(req.getChunkTotal())) { + String targetFilePath = basePath + File.separator + req.getSourceFileName(); + mergeChunkFiles(tempDir, new File(targetFilePath)); + // 合并完成删除临时目录 + deleteDirectory(tempDir); + log.info("文件合并完成,临时目录已删除:{}", targetFilePath); + // 上传的文件夹的后端磁盘路径 + if(StringUtils.isNotBlank(basePathParent)){ + resp.setFinalFullFilePath(basePathParent); + }else { + // 单一文件的后端路径 + resp.setFinalFullFilePath(targetFilePath); + } + return resp; + } + // 7. 非最后一片:返回临时目录给前端后续使用 + return resp; + } + + + /** + * 合并所有分片文件 + */ + private void mergeChunkFiles(File tempDir, File targetFile) throws IOException { + try (FileChannel outChannel = new FileOutputStream(targetFile).getChannel()) { + File[] chunkFiles = tempDir.listFiles((dir, name) -> name.contains(".")); + if (chunkFiles == null || chunkFiles.length == 0) { + throw new FileNotFoundException("分片文件不存在"); + } + + // 按分片序号排序 + java.util.Arrays.sort(chunkFiles, (f1, f2) -> { + int c1 = Integer.parseInt(f1.getName().split("\\.")[1]); + int c2 = Integer.parseInt(f2.getName().split("\\.")[1]); + return Integer.compare(c1, c2); + }); + + // 合并分片 + for (File chunk : chunkFiles) { + try (FileChannel inChannel = new FileInputStream(chunk).getChannel()) { + inChannel.transferTo(0, inChannel.size(), outChannel); + } finally { + Files.deleteIfExists(chunk.toPath()); + } + } + } + } + + /** + * 删除目录及所有文件 + */ + private void deleteDirectory(File dir) throws IOException { + if (dir.isDirectory()) { + File[] children = dir.listFiles(); + if (children != null) { + for (File child : children) { + deleteDirectory(child); + } + } + } + Files.deleteIfExists(dir.toPath()); + } +} diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index cbe308ae..5d61c269 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -30,6 +30,7 @@ import com.sdm.pbs.model.entity.SimulationHpcCommandPlaceholder; import com.sdm.pbs.model.entity.SimulationJob; import com.sdm.pbs.model.req.JobFileCallBackReq; import com.sdm.pbs.model.req.QueryJobReq; +import com.sdm.pbs.model.req.SingleHpcJobSubmitReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import com.sdm.pbs.service.*; import lombok.extern.slf4j.Slf4j; @@ -68,6 +69,8 @@ import java.util.stream.Collectors; @ConditionalOnProperty(name = "pbs.task.impl", havingValue = "hpc") public class PbsServiceDecorator implements IPbsServiceDecorator { + private static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE"; + @Autowired private HpcCommandExcuteUtil hpcCommandExcuteUtil; @@ -189,7 +192,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { throw new RuntimeException("Hpc执行失败,返回jobId空"); } // 4. 保存任务信息到数据库 - saveSimulationJobToDb(req, jobId, hpcOutPutDir); + saveOrUpdateSimulationJobToDb(req, jobId, hpcOutPutDir); return SdmResponse.success(jobId); } @@ -302,7 +305,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { * @param hpcOutPutDir 输出目录 * 执行命令 */ - private void saveSimulationJobToDb(SubmitHpcTaskReq req, String jobId, String hpcOutPutDir) { + private void saveOrUpdateSimulationJobToDb(SubmitHpcTaskReq req, String jobId, String hpcOutPutDir) { if (StringUtils.isNotEmpty(jobId)) { // 数据入库 SimulationJob simulationJob = new SimulationJob(); @@ -312,7 +315,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setCoreNum(req.getCoreNum()); simulationJob.setSoftware(req.getSoftware()); simulationJob.setJobType(req.getJobType()); - simulationJob.setIndependence(req.isIndependence()); + simulationJob.setIndependence(req.getIndependence()); // simulationJob.setInputFiles(JSONObject.toJSONString(req.getInputFiles())); // 主文件位置 simulationJob.setMasterFile(req.getMasterFilePath()); @@ -337,7 +340,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { // 执行信息 定时任务回传的时候修改 // simulationJob.setStartTime("2025-11-30 10:00:00"); // simulationJob.setEndTime("2025-11-30 12:30:00"); - // 提交成功,队列中 + // 提交成功,队列中 todo 查一下提交任务的状态记录数据 simulationJob.setJobStatus("Queued"); // 求解器名称 simulationJob.setSolverName(req.getSoftware()); @@ -359,14 +362,25 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setUpdateTime(LocalDateTime.now()); // 当前节点minio的文件夹id simulationJob.setDirId(req.getDirId()); - simulationJob.setInputFileId(req.getInputFileId()); simulationJob.setInputFormat(req.getInputFormat()); simulationJob.setSlaveFormat(req.getSlaveFormat()); simulationJob.setOutputFormat(req.getOutputFormat()); simulationJob.setProcessDefinitionId(req.getProcessDefinitionId()); simulationJob.setProcessInstanceId(req.getProcessInstanceId()); - simulationJobService.save(simulationJob); + simulationJob.setGroupId(req.getGroupId()); + if(StringUtils.isNotBlank(req.getJobDesc())){ + simulationJob.setJobDesc(req.getJobDesc()); + } + + // 独立单一任务 + if(Objects.equals(req.getFrom(),EXECUTE_MODE_WEB_SINGLE)){ + SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one(); + simulationJob.setId(jobDb.getId()); + simulationJobService.updateById(simulationJob); + }else { + simulationJobService.save(simulationJob); + } } } @@ -610,6 +624,44 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { return pbsService.stopStreamTaskLog(clientToken); } + @Override + public SdmResponse preSingleHpcJobSubmit(SingleHpcJobSubmitReq req) { + // 预提交数据入库 + SimulationJob simulationJob = new SimulationJob(); + // 基础字段 + // simulationJob.setId(1L); + simulationJob.setJobName(req.getJobName()); + simulationJob.setCoreNum(req.getCoreNum()); + simulationJob.setSoftware(req.getSoftware()); + // job 的类型 + simulationJob.setJobType(req.getHpcGroup()); + // 0:flow 任务 1:独立提交单一任务,2独立提交批量任务 + int independence = Objects.equals(req.getType(),"single") ? 1:2; + simulationJob.setIndependence(independence); + // 软件及文件关联 + simulationJob.setSoftwareId(req.getSoftwareId()); + // 任务装备状态 + simulationJob.setJobStatus("Preparing"); + // 求解器名称 + simulationJob.setSolverName(req.getSoftware()); + // 文件上行处理中 + simulationJob.setFileStatus("uplinking"); + // 文件详情 + simulationJob.setJobDesc(req.getJobDesc()); + simulationJob.setUuid(req.getUuid()); + // 审计字段 + Long userId = ThreadLocalContext.getUserId(); + Long tenantId = ThreadLocalContext.getTenantId(); + CoreLogger.info("preSingleHpcJobSubmit save db userId:{},tenantId:{}", userId, tenantId); + simulationJob.setCreatorId(userId); + simulationJob.setTenantId(tenantId); + simulationJob.setCreateTime(LocalDateTime.now()); + simulationJob.setUpdaterId(userId); + simulationJob.setUpdateTime(LocalDateTime.now()); + simulationJobService.save(simulationJob); + return SdmResponse.success("HPC任务信息预入库成功"); + } + public SdmResponse delHpcJobs(DelHpcJobsReq req) { if(CollectionUtils.isEmpty(req.getHpcJobIds())){