From f97a6dfee81871c9934f0a1300a8ab2a698a1ae2 Mon Sep 17 00:00:00 2001 From: yangyang01000846 <15195822163@163.com> Date: Thu, 4 Dec 2025 09:41:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Ahpc=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=EF=BC=8C=E4=B8=8B=E8=BD=BD=EF=BC=8C=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E6=B1=82=E8=A7=A3=E6=96=87=E4=BB=B6=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/req/pbs/HpcTaskFileDownReq.java | 18 +++ .../req/pbs/SubmitHpcTaskRemoteReq.java | 33 +++-- .../feign/inter/pbs/ITaskFeignClient.java | 6 +- .../common/utils/HpcCommandExcuteUtil.java | 50 +++++-- .../flowable/delegate/handler/HpcHandler.java | 132 +++++++++++++++--- flowable/src/main/resources/application.yml | 2 +- .../sdm/pbs/controller/TaskController.java | 12 +- .../sdm/pbs/model/req/SubmitHpcTaskReq.java | 20 ++- .../impl/HpcInstructionServiceImpl.java | 3 +- .../pbs/service/impl/IPbsHpcServiceImpl.java | 7 +- .../pbs/service/impl/PbsServiceDecorator.java | 100 ++++++++++--- pbs/src/main/resources/application-dev.yml | 2 + 12 files changed, 313 insertions(+), 72 deletions(-) create mode 100644 common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileDownReq.java diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileDownReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileDownReq.java new file mode 100644 index 00000000..7b543cf8 --- /dev/null +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileDownReq.java @@ -0,0 +1,18 @@ +package com.sdm.common.entity.req.pbs; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +public class HpcTaskFileDownReq { + + @Schema(description = "任务ID") + public String jobId; + + @Schema(description = "文件名称") + public String fileName; + + @Schema(description = "文件大小") + public Long fileSize; + +} diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java index 2d38c02c..b14ca782 100644 --- a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java @@ -1,17 +1,15 @@ package com.sdm.common.entity.req.pbs; -import com.baomidou.mybatisplus.annotation.FieldStrategy; -import com.baomidou.mybatisplus.annotation.TableField; -import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig; +import com.alibaba.fastjson2.annotation.JSONField; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; +import org.springframework.web.multipart.MultipartFile; import java.util.ArrayList; import java.util.List; -import java.util.Map; @Data -public class SubmitHpcTaskRemoteReq extends BaseExecuteConfig { +public class SubmitHpcTaskRemoteReq { @Schema(description = "配置时的mm时间戳") public String timesmap; @@ -31,11 +29,13 @@ public class SubmitHpcTaskRemoteReq extends BaseExecuteConfig { @Schema(description = "计算任务是否独立存在 0:非独立任务 1:独立任务") public int independence; - @Schema(description = "求解文件,featchFileType =beforeNode 时传递") - public List inputFiles = new ArrayList<>(); + @Schema(description = "求解文件") + @JSONField(serialize = false) + public List inputFiles = new ArrayList<>(); @Schema(description = "计算主文件") - public String masterFile; + @JSONField(serialize = false) + public MultipartFile masterFile; @Schema(description = "计算任务所属任务ID") public String taskId; @@ -52,16 +52,19 @@ public class SubmitHpcTaskRemoteReq extends BaseExecuteConfig { @Schema(description = "执行的命令") public String command; + @Schema(description = "命令执行输出文件名xx.out") + public String stdout; + @Schema(description = "任务所属项目") public String projectname; - @Schema(description = "获取文件的方式:上一节点:beforeNode,hpc节点(文件提前上传工作目录):hpcNode") - public String featchFileType; +// @Schema(description = "获取文件的方式:上一节点:beforeNode,hpc节点(文件提前上传工作目录):hpcNode") +// public String featchFileType; +// +// @Schema(description = "上一节点Id,featchFileType:beforeNode时传递 ") +// public String beforeNodeId; - @Schema(description = "上一节点Id,featchFileType:beforeNode时传递 ") - public String beforeNodeId; - - @Schema(description= "自定义占位符,只有列表展示使用,key 就是占位符") - private Map commandExpand; +// @Schema(description= "自定义占位符,只有列表展示使用,key 就是占位符") +// private Map commandExpand; } diff --git a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java index dead8672..60a89026 100644 --- a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java +++ b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java @@ -4,8 +4,8 @@ import com.sdm.common.common.SdmResponse; import com.sdm.common.config.LongTimeRespFeignConfig; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; @FeignClient( @@ -15,7 +15,7 @@ import org.springframework.web.bind.annotation.RequestBody; public interface ITaskFeignClient { // "作业提交" - @PostMapping("/pbs/submitHpcJob") - SdmResponse submitHpcJob(@RequestBody SubmitHpcTaskRemoteReq req); + @PostMapping(value = "/pbs/submitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + SdmResponse submitHpcJob( SubmitHpcTaskRemoteReq req); } diff --git a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java index 405c9956..f1738a30 100644 --- a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java +++ b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java @@ -9,12 +9,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; +import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.stereotype.Component; +import org.springframework.web.multipart.MultipartFile; +import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; @@ -44,6 +48,9 @@ public class HpcCommandExcuteUtil { @Value("${hpc.remoteDownLoadFileUrl:}") private String remoteDownLoadFileUrl; + @Value("${hpc.remoteUploadFileUrl:}") + private String remoteUploadFileUrl; + @Value("${hpc.callHpcUpload:}") private String callHpcUpload; @@ -137,29 +144,31 @@ public class HpcCommandExcuteUtil { } public ResponseEntity hpcDownloadFile(String path, Long fileSize) { - // 从 path 中提取文件名 String fileName = extractFileName(path); String encodedFileName = URLEncoder.encode(fileName, StandardCharsets.UTF_8); + StreamingResponseBody body = outputStream -> { + // 构建完整 URL,并安全编码 path + String url = remoteDownLoadFileUrl + "?path=" + URLEncoder.encode(path, StandardCharsets.UTF_8); + + // 调用 B 服务并流式写出 DataBufferUtils.write( webClient.get() - .uri(remoteDownLoadFileUrl, path) + .uri(url) .retrieve() .bodyToFlux(DataBuffer.class), Channels.newChannel(outputStream) - ).blockLast(); // 阻塞直到写完 + ).blockLast(); }; - // 构建 ResponseEntity ResponseEntity.BodyBuilder builder = ResponseEntity.ok() - .header(HttpHeaders.CONTENT_DISPOSITION, - "attachment; filename*=UTF-8''" + encodedFileName) + .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename*=UTF-8''" + encodedFileName) .contentType(MediaType.APPLICATION_OCTET_STREAM); - // 只有在 fileSize 合法时才设置 Content-Length if (fileSize != null && fileSize > 0) { builder.contentLength(fileSize); } + return builder.body(body); } @@ -195,6 +204,31 @@ public class HpcCommandExcuteUtil { return lastSlash >= 0 ? path.substring(lastSlash + 1) : path; } - + // 调用工具上传hpc文件 + public String uploaHpcFile(MultipartFile file, String subDir) { + try { + // 3. Multipart body 构建 + MultipartBodyBuilder builder = new MultipartBodyBuilder(); + builder.part("file", new ByteArrayResource(file.getBytes()) { + @Override + public String getFilename() { + return file.getOriginalFilename(); + } + }); + builder.part("subDir", subDir); + // 4. 调用 B 服务上传接口 + String uploadResult = webClient.post() + .uri(remoteUploadFileUrl) + .contentType(MediaType.MULTIPART_FORM_DATA) + .body(BodyInserters.fromMultipartData(builder.build())) + .retrieve() + .bodyToMono(String.class) + .block(); + return uploadResult; + } catch (Exception e) { + System.out.println("上传失败"); + return ""; + } + } } diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index 17268ec1..8b5c8e2d 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -1,9 +1,14 @@ package com.sdm.flowable.delegate.handler; +import com.alibaba.fastjson2.JSONObject; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; +import com.sdm.common.entity.req.pbs.SimulationCommandPlaceholderReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; +import com.sdm.common.log.CoreLogger; import com.sdm.flowable.service.IAsyncTaskRecordService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -12,14 +17,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.Map; // HPC(executeType=HPC) @Slf4j @Component("HPC") -public class HpcHandler implements ExecutionHandler { +public class HpcHandler implements ExecutionHandler,HPCExecuteConfig> { @Autowired private IAsyncTaskRecordService asyncTaskRecordService; @@ -27,14 +32,19 @@ public class HpcHandler implements ExecutionHandler params, HPCExecuteConfig config) { + SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq = convertParamsToReq(params); +// submitHpcTaskRemoteReq.setBeforeNodeId(config.getBeforeNodeId()); // 实现HPC处理逻辑... // INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败) String status = "INIT"; // 1. 调用 HPC 平台提交任务 - SdmResponse submitResp = taskFeignClient.submitHpcJob(params); + SdmResponse submitResp = taskFeignClient.submitHpcJob(submitHpcTaskRemoteReq); if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ log.error("HpcHandler submit failed,jobName:{}",params); status = "FAIL"; @@ -53,17 +63,107 @@ public class HpcHandler implements ExecutionHandler params) { + SubmitHpcTaskRemoteReq req = new SubmitHpcTaskRemoteReq(); + if (params == null) { + return req; + } + ObjectMapper objectMapper = new ObjectMapper(); // 需确保ObjectMapper已配置或注入 + // 基础字段映射 + req.setTimesmap(params.get("timesmap").toString()); + req.setJobName(params.get("jobName").toString()); + // 处理int类型字段,包含空值和非数字的异常处理 + try { + req.setCoreNum(params.get("coreNum") != null ? Integer.parseInt(params.get("coreNum").toString()) : 0); + } catch (NumberFormatException e) { + CoreLogger.error("coreNum parse error:{},coreNum:{}",e.getMessage(),params.get("coreNum")); + req.setCoreNum(0); + } + req.setSoftware(params.get("software").toString()); + req.setJobType(params.get("jobType").toString()); + try { + req.setIndependence(params.get("independence") != null ? Integer.parseInt(params.get("independence").toString()) : 0); + } catch (NumberFormatException e) { + req.setIndependence(0); + } + req.setTaskId(params.get("taskId").toString()); + req.setTaskName(params.get("taskName").toString()); + req.setRunId(params.get("runId").toString()); + req.setRunName(params.get("runName").toString()); + req.setCommand(params.get("command").toString()); + req.setProjectname(params.get("projectname").toString()); +// req.setFeatchFileType(params.get("featchFileType").toString()); + // req.setBeforeNodeId(params.get("beforeNodeId").toString()); + // 处理commandExpand字段(JSON字符串转Map) + String commandExpandJson = params.get("commandExpand").toString(); + if (StringUtils.isNotBlank(commandExpandJson)) { + try { + // 将JSON字符串转换为Map + Map commandExpand = objectMapper.readValue( + commandExpandJson, + new TypeReference>() {} + ); +// req.setCommandExpand(commandExpand); + } catch (Exception e) { + CoreLogger.error("convertParamsToReq error:{},params:{}",e.getMessage(), JSONObject.toJSONString(params)); + // 如设为null或空Map +// req.setCommandExpand(new HashMap<>()); + } + } + return req; + } + public String mockinit(){ - SubmitHpcTaskRemoteReq mockReq = mockSubmitHpcTaskReq(); - SdmResponse submitResp = taskFeignClient.submitHpcJob(mockReq); - if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ - log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName()); - System.out.println("失败"); - return "失败"; - } - String hpcTaskId = submitResp.getData(); - return hpcTaskId; +// SubmitHpcTaskRemoteReq mockReq = mockSubmitHpcTaskReq(); +// SdmResponse submitResp = taskFeignClient.submitHpcJob(mockReq); +// if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ +// log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName()); +// System.out.println("失败"); +// return "失败"; +// } +// String hpcTaskId = submitResp.getData(); + Map params = getParams(); + HPCExecuteConfig hpcExecuteConfig = new HPCExecuteConfig(); + // todo `flowable`.`process_node_param` + hpcExecuteConfig.setBeforeNodeId("uuid-node-8d3e61e7-1374-419c-9e46-210cb88c1113"); + execute(null,params,hpcExecuteConfig); + return "ok"; + } + + private Map getParams() { + Map params = new HashMap<>(); + // 基础字段 + params.put("timesmap", String.valueOf(System.currentTimeMillis())); // 示例时间戳(2025-07-29 00:00:00) + params.put("jobName", "HPC-数据处理作业-"+ System.currentTimeMillis()); + params.put("coreNum", 32); + params.put("software", "reta.exe"); + params.put("jobType", "流体动力学仿真"); + params.put("independence", 1); + params.put("taskId", "123456"); + params.put("taskName", "锂电池热管理系统研发"); + params.put("runId", "55555"); + params.put("runName", "HPC-电池"); +// params.put("command", "\\\\CARSAFE\\share\\solver\\RLithium\\reta.exe -i %retaFile"); + params.put("command", "\\\\CARSAFE\\share\\solver\\RLithium\\reta.exe -i .\\model\\aa.xml"); + params.put("projectname", "新能源汽车锂电池安全性能优化项目"); + params.put("featchFileType", "hpcNode"); // 补充示例值 + params.put("beforeNodeId", null); // 示例空值 + // commandExpand去掉outName后的JSON字符串 + String commandExpandJson = "{\n" + + " \"retaFile\": {\n" + + " \"id\": 1,\n" + + " \"keyEnName\": \"retaFile\",\n" + + " \"keyCnName\": \"电池求解文件\",\n" + + " \"valueType\": \"file_regex_match\",\n" + + " \"inputValue\": \"*.jpg\"\n" + + " }\n" + + "}"; + params.put("commandExpand", commandExpandJson); + return params; } private SubmitHpcTaskRemoteReq mockSubmitHpcTaskReq() { @@ -76,8 +176,8 @@ public class HpcHandler implements ExecutionHandler submitHpcJob(@RequestBody SubmitHpcTaskRemoteReq req) { + public SdmResponse submitHpcJob(SubmitHpcTaskRemoteReq req) { SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); BeanUtils.copyProperties(req,submitHpcTaskReq); return pbsService.submitHpcJob(submitHpcTaskReq); @@ -97,10 +99,10 @@ public class TaskController implements ITaskFeignClient { return pbsService.getJobResultFiles(req.getJobId(),req.getTargetDir()); } - @GetMapping("/hpcDownloadFile") + @PostMapping("/hpcDownloadFile") @Operation(summary = "作业下文件下载") - ResponseEntity hpcDownloadFile(@RequestParam String jobId,@RequestParam String fileName,@RequestParam Long fileSize) { - return pbsService.downloadFile(jobId,fileName,fileSize); + ResponseEntity hpcDownloadFile(@RequestBody HpcTaskFileDownReq req) { + return pbsService.downloadFile(req.getJobId(),req.getFileName(),req.getFileSize()); } @PostMapping("/queryJobs") diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java index f90d080d..71af7711 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java +++ b/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java @@ -1,7 +1,9 @@ package com.sdm.pbs.model.req; +import com.alibaba.fastjson2.annotation.JSONField; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; +import org.springframework.web.multipart.MultipartFile; import java.util.ArrayList; import java.util.List; @@ -24,10 +26,18 @@ public class SubmitHpcTaskReq { public boolean independence; @Schema(description = "求解文件") - public List inputFiles = new ArrayList<>(); + @JSONField(serialize = false) + public List inputFiles = new ArrayList<>(); + + @Schema(description = "求解文件路径") + public List inputFilePaths = new ArrayList<>(); @Schema(description = "计算主文件") - public String masterFile; + @JSONField(serialize = false) + public MultipartFile masterFile; + + @Schema(description = "主文件上传后的路径") + public String masterFilePath; @Schema(description = "计算任务所属任务ID") public String taskId; @@ -44,6 +54,12 @@ public class SubmitHpcTaskReq { @Schema(description = "执行的命令") public String command; + @Schema(description = "命令执行输出文件名xx.out") + public String stdout; + + @Schema(description = "工作目录,代码逻辑生成,和求解主文件平级") + public String workDir; + @Schema(description = "任务所属项目") public String projectname; diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java index aba194da..9d772ebc 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java @@ -118,7 +118,8 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { String prefixStr = HpcCommandBuilderUtil.initAddJobPrefixStr(req.getJobId()); AddJobParam addJobParam = new AddJobParam(); BeanUtils.copyProperties(req, addJobParam); - String targetWorkDir = addJobParam.getWorkdir() + "\\" + req.getJobId(); +// String targetWorkDir = addJobParam.getWorkdir() + "\\" + req.getJobId(); + String targetWorkDir = addJobParam.getWorkdir(); Pair workDirPair = createDirIfNotExist(targetWorkDir); if(!workDirPair.getLeft()){ AddJobResp addJobResp=new AddJobResp(); diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java index 00debf06..ca174481 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java @@ -65,11 +65,8 @@ public class IPbsHpcServiceImpl implements IPbsService { newJobReq.setProjectname(req.getProjectname()); AddJobReq addJobReq = new AddJobReq(); addJobReq.setName(req.getRunName()); - // todo - addJobReq.setStdout("1126.out"); - // todo - addJobReq.setWorkdir("\\\\CARSAFE\\share\\spdm"); - // todo + addJobReq.setStdout(req.getStdout()); + addJobReq.setWorkdir(req.getWorkDir()); addJobReq.setCommand(req.getCommand()); SubmitHpcJobReq submitHpcJobReq = new SubmitHpcJobReq(); mergeSubmitHpcJobReq.setNewJobReq(newJobReq); diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index c912a0c0..f21211ef 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -7,6 +7,7 @@ import com.github.pagehelper.PageInfo; import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.resp.PageDataResp; import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; +import com.sdm.common.utils.HpcCommandExcuteUtil; import com.sdm.common.utils.PageUtils; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; @@ -17,12 +18,14 @@ import com.sdm.pbs.model.req.JobFileCallBackReq; import com.sdm.pbs.model.req.QueryJobReq; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import com.sdm.pbs.service.*; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import java.time.LocalDateTime; @@ -36,6 +39,9 @@ import java.util.stream.Collectors; @ConditionalOnProperty(name = "pbs.task.impl", havingValue = "hpc") public class PbsServiceDecorator implements IPbsServiceDecorator { + @Autowired + private HpcCommandExcuteUtil hpcCommandExcuteUtil; + // 正则匹配%后的单词(\w+ 匹配字母、数字、下划线) private static final Pattern PLACEHOLDER_PATTERN = Pattern.compile("%(\\w+)"); @@ -67,9 +73,24 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { @Override public SdmResponse submitHpcJob(SubmitHpcTaskReq req) { -// SdmResponse response = pbsService.submitHpcJob(req); -// if(response.isSuccess()&&StringUtils.isNotEmpty(response.getData())) { - String jobId = "8848"; + //1. 上传hpc主文件 及 其他文件 + MultipartFile masterFile = req.getMasterFile(); + String subDir = req.getJobName()+"\\"+System.currentTimeMillis(); + // webClient 调用上传,这个是主文件,求解算出的文件,及stdout文件都指定这个文件夹下面 + String masterFilePath = hpcCommandExcuteUtil.uploaHpcFile(masterFile,subDir); + dealInputFiles(req,subDir); + // 任务输出的文件夹 + String hpcOutPutDir = extractDirectory(masterFilePath); + req.setWorkDir(hpcOutPutDir); + // 前置处理 替换求解文件 + String formatCommand = String.format(req.getCommand(), masterFilePath); + req.setCommand(formatCommand); + req.setMasterFilePath(masterFilePath); + SdmResponse response = pbsService.submitHpcJob(req); + String jobId=""; + if(response.isSuccess()&&StringUtils.isNotEmpty(response.getData())) { + jobId = response.getData(); + } if(StringUtils.isNotEmpty(jobId)) { // 数据入库 SimulationJob simulationJob = new SimulationJob(); @@ -80,13 +101,15 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setSoftware(req.getSoftware()); simulationJob.setJobType(req.getJobType()); simulationJob.setIndependence(req.isIndependence()); - simulationJob.setInputFiles(JSONObject.toJSONString(req.getInputFiles())); - simulationJob.setMasterFile(req.getMasterFile()); +// simulationJob.setInputFiles(JSONObject.toJSONString(req.getInputFiles())); + // 主文件位置 todo + simulationJob.setMasterFile(req.getMasterFilePath()); + // 求解文件集合 + simulationJob.setInputFiles(JSONObject.toJSONString(req.getInputFilePaths())); simulationJob.setTaskId(req.getTaskId()); simulationJob.setTaskName(req.getTaskName()); simulationJob.setRunId(req.getRunId()); simulationJob.setRunName(req.getRunName()); - // 软件及文件关联 simulationJob.setSoftwareId(req.getSoftwareId()); // 下面的待定 todo @@ -94,11 +117,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setJobId(jobId); // 没必要要 simulationJob.setJobDetailId("todo"); - // 文件路径 todo 共享目录+jobName(文件回传)+uuid,下面可能有多个文件 - simulationJob.setStdoutHpcFilePath("/hpc/shared/job001/uuid-123"); + simulationJob.setStdoutHpcFilePath(hpcOutPutDir); simulationJob.setStdoutSpdmFilePath("/minio/base/job001/uuid-123"); - // todo 执行信息 定时任务回传的时候修改 simulationJob.setNodeName("todo"); simulationJob.setExecutCommand("ansys -b -input input.dat -output output.log"); @@ -108,11 +129,10 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setJobStatus("Configuring"); // ? todo 没比要 simulationJob.setSolverName("LS-DYNA"); - // todo 执行信息 定时任务回传的时候修改 - simulationJob.setTotalKernelTime(3600000L); - simulationJob.setTotalUserTime(7200000L); - simulationJob.setTotalElapsedTime(9000L); + simulationJob.setTotalKernelTime(null); + simulationJob.setTotalUserTime(null); + simulationJob.setTotalElapsedTime(null); // 标识及状态 simulationJob.setUuid("f81d4fae7dec11d0a76500a0c91e6bf6"); @@ -129,6 +149,41 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { return SdmResponse.success(jobId); } + private void dealInputFiles(SubmitHpcTaskReq req, String subDir) { + if(req.getInputFiles()==null|| CollectionUtils.isEmpty(req.getInputFiles())) { + return; + } + List inputFiles = req.getInputFiles(); + List list = new ArrayList<>(); + for (MultipartFile inputFile : inputFiles) { + String inputFilePath = hpcCommandExcuteUtil.uploaHpcFile(inputFile,subDir); + list.add(inputFilePath); + } + req.setInputFilePaths(list); + } + + /** + * 从文件路径中提取目录部分(包含最后一个路径分隔符) + * @param fullPath 完整的文件路径 + * @return 目录路径(包含最后一个反斜杠),若路径为空或无分隔符则返回原路径 + */ + private String extractDirectory(String fullPath) { + // 校验参数 + if (fullPath == null || fullPath.isEmpty()) { + return fullPath; + } + + // 找到最后一个反斜杠的位置 + int lastSeparatorIndex = fullPath.lastIndexOf("\\"); + + // 若没有找到分隔符,返回原路径;否则截取到最后一个分隔符(包含) + if (lastSeparatorIndex == -1) { + return fullPath; + } + + return fullPath.substring(0, lastSeparatorIndex + 1); + } + @Override public SdmResponse stopHpcJob(String jobId) { return pbsService.stopHpcJob(jobId); @@ -142,9 +197,22 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { @Override public SdmResponse> getJobResultFiles(String jobId,String targetDir) { - // todo 根据jobId 获取工作目录,共享目录+jobName(文件回传)+uuid,下面可能有多个文件 - String workDir = StringUtils.isNotBlank(targetDir) ? targetDir :"D:\\需求"; - SdmResponse> nodeInfos = pbsService.getJobResultFiles("", workDir); + // 根据 jobId 获取工作信息 + SimulationJob simulationJob = simulationJobService.lambdaQuery() + .eq(SimulationJob::getJobId, jobId) + .one(); + // 选择 queryPath + String queryPath; + if (targetDir != null && !targetDir.isEmpty()) { + queryPath = targetDir; + } else if (simulationJob!=null&&simulationJob.getStdoutHpcFilePath() != null && !simulationJob.getStdoutHpcFilePath().isEmpty()) { + queryPath = simulationJob.getStdoutHpcFilePath(); + } else { + return SdmResponse.failed("查询路径为空,无法获取文件"); + } + + // 调用 PBS 服务获取文件列表 + SdmResponse> nodeInfos = pbsService.getJobResultFiles("", queryPath); return nodeInfos; } diff --git a/pbs/src/main/resources/application-dev.yml b/pbs/src/main/resources/application-dev.yml index 90f871c8..37e39479 100644 --- a/pbs/src/main/resources/application-dev.yml +++ b/pbs/src/main/resources/application-dev.yml @@ -108,6 +108,8 @@ hpc: remoteCreateDirUrl: http://192.168.65.55:9097/createDir remoteScanDirUrl: http://192.168.65.55:9097/scanDir remoteDownLoadFileUrl: http://192.168.65.55:9097/hpcDownload +# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload + remoteUploadFileUrl: http://192.168.65.55:9097/uploadHpcFile callHpcUpload: http://192.168.65.55:9097/addJobQueue