diff --git a/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/HPCExecuteConfig.java b/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/HPCExecuteConfig.java index d61a6551..0e5983c5 100644 --- a/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/HPCExecuteConfig.java +++ b/common/src/main/java/com/sdm/common/entity/flowable/executeConfig/HPCExecuteConfig.java @@ -5,7 +5,6 @@ import lombok.Data; @Data public class HPCExecuteConfig extends BaseExecuteConfig { - private String currentNodeId; private String beforeNodeId; // 先默认写死一个,后面前端配置传递 private String masterFileRegularStr="^aa\\.xml$"; diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java index 82af2692..1ed4214d 100644 --- a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java @@ -58,4 +58,20 @@ public class SubmitHpcTaskRemoteReq { @Schema(description = "任务所属项目") public String projectname; + @Schema(description = "计算任务回传minio的路径") + public String stdoutSpdmMinoFilePath; + + @Schema(description = "任务回传本地Nas的路径") + public String stdoutSpdmNasFilePath; + + @Schema(description = "任务求解文件本地路径,spdm工作流引擎会传递过来,adapter里使用") + public String simulationFileLocalPath; + + @Schema(description = "任务求解文件主文件正则,spdm工作流引擎会传递过来,adapter里使用") + public String masterFileRegularStr; + + @Schema(description = "任务求解文件从文件正则,spdm工作流引擎会传递过来,adapter里使用") + public String inputFilesRegularStr; + + } diff --git a/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java b/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java index 93eec920..f4c5cf28 100644 --- a/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java +++ b/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java @@ -16,10 +16,10 @@ public class TaskClientFeignClientImpl implements ITaskFeignClient { private ITaskFeignClient taskFeignClient; @Override - public SdmResponse submitHpcJob(SubmitHpcTaskRemoteReq req) { + public SdmResponse adapterSubmitHpcJob(SubmitHpcTaskRemoteReq req) { SdmResponse response; try { - response = taskFeignClient.submitHpcJob(req); + response = taskFeignClient.adapterSubmitHpcJob(req); return response; } catch (Exception e) { CoreLogger.error("SubmitHpcJob Exception:{}", e.getMessage()); diff --git a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java index 60a89026..c5048433 100644 --- a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java +++ b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java @@ -15,7 +15,7 @@ import org.springframework.web.bind.annotation.PostMapping; public interface ITaskFeignClient { // "作业提交" - @PostMapping(value = "/pbs/submitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) - SdmResponse submitHpcJob( SubmitHpcTaskRemoteReq req); + @PostMapping(value = "/pbs/adapterSubmitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + SdmResponse adapterSubmitHpcJob( SubmitHpcTaskRemoteReq req); } diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index d269a9d1..25e6a3a6 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -9,7 +9,6 @@ import com.sdm.common.entity.resp.data.FileMetadataInfoResp; import com.sdm.common.feign.inter.data.IDataFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.log.CoreLogger; -import com.sdm.common.utils.FilesUtil; import com.sdm.flowable.constants.FlowableConfig; import com.sdm.flowable.entity.ProcessNodeParam; import com.sdm.flowable.service.IAsyncTaskRecordService; @@ -19,13 +18,14 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import org.springframework.web.multipart.MultipartFile; import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; // HPC(executeType=HPC) @Slf4j @@ -53,8 +53,11 @@ public class HpcHandler implements ExecutionHandler,HPCExecu CoreLogger.info("hpc process excute,params:{},config:{}",JSONObject.toJSONString(params),JSONObject.toJSONString(config)); SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq = convertParamsToReq(params); String beforeNodeId = config.getBeforeNodeId(); + String currentNodeId =execution.getCurrentActivityId(); String masterFileRegularStr = config.getMasterFileRegularStr(); String inputFilesRegularStr = config.getInputFilesRegularStr(); + CoreLogger.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr); + // params 取只是测试使用 String processDefinitionId = (execution==null||StringUtils.isBlank(execution.getProcessDefinitionId()))? params.get("processDefinitionId").toString():execution.getProcessDefinitionId(); @@ -62,15 +65,17 @@ public class HpcHandler implements ExecutionHandler,HPCExecu String processInstanceId = (execution==null||StringUtils.isBlank(execution.getProcessInstanceId()))? params.get("processInstanceId").toString():execution.getProcessInstanceId(); - // hpc文件处理 - dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,masterFileRegularStr,inputFilesRegularStr, - processDefinitionId,processInstanceId); + submitHpcTaskRemoteReq.setMasterFileRegularStr(masterFileRegularStr); + submitHpcTaskRemoteReq.setInputFilesRegularStr(inputFilesRegularStr); + + // hpc文件路径处理 + dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId); // 实现HPC处理逻辑... // INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败) String status = "INIT"; // 1. 调用 HPC 平台提交任务 - SdmResponse submitResp = taskFeignClient.submitHpcJob(submitHpcTaskRemoteReq); + SdmResponse submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq); if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ log.error("HpcHandler submit failed,jobName:{}",params); status = "FAIL"; @@ -90,15 +95,38 @@ public class HpcHandler implements ExecutionHandler,HPCExecu log.info("HPC 任务 {} 已提交", "hpcTaskId"); } - private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String masterFileRegularStr, - String inputFilesRegularStr,String processDefinitionId,String processInstanceId) { + private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId, + String processDefinitionId, String processInstanceId) { String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR; // 查询前节点的工作目录---》本地磁盘对应目录 - ProcessNodeParam processNodeParam = processNodeParamService.lambdaQuery(). - eq(ProcessNodeParam::getNodeId, beforeNodeId). - eq(ProcessNodeParam::getProcessDefinitionId,processDefinitionId). - eq(ProcessNodeParam::getProcessInstanceId,processInstanceId). - one(); + // 查询前节点和当前节点的工作目录---》本地磁盘对应目录 + List processNodeParams = processNodeParamService.lambdaQuery() + .in(ProcessNodeParam::getNodeId, beforeNodeId, currentNodeId) // 使用 in 条件,匹配 beforeNodeId 或 currentNodeId + .eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId) + .eq(ProcessNodeParam::getProcessInstanceId, processInstanceId) + .orderByDesc(ProcessNodeParam::getUpdateTime) + .list(); + + Map> nodeParamMap = processNodeParams.stream() + .collect(Collectors.groupingBy(ProcessNodeParam::getNodeId)); + List beforeNodeParams = nodeParamMap.get(beforeNodeId); + List currentNodeParams = nodeParamMap.get(currentNodeId); + if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){ + throw new RuntimeException("未获取到当前节点或者求解文件节点信息"); + } + ProcessNodeParam beforeNode = beforeNodeParams.get(0); + ProcessNodeParam currentNode = currentNodeParams.get(0); + String beforeNodeJectKey = getNodeObjectKey(beforeNode); + String currentNodeJectKey = getNodeObjectKey(currentNode); + // 本地求解文件路径 taskLocalBaseDir + submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey); + // hpc 回传文件路径 + submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey); + CoreLogger.info("localSaveDir :{} ,{}",simulationBaseDir + beforeNodeJectKey,simulationBaseDir + currentNodeJectKey); + } + + + private String getNodeObjectKey(ProcessNodeParam processNodeParam){ String paramJson = processNodeParam.getParamJson(); JSONObject paramJsonObject = JSONObject.parseObject(paramJson); // outputDirId @@ -108,36 +136,12 @@ public class HpcHandler implements ExecutionHandler,HPCExecu getFileBaseInfoReq.setFileId(outputDirId); SdmResponse fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq); if(!fileBaseInfoResp.isSuccess()||fileBaseInfoResp.getData()==null){ + CoreLogger.warn("getFileBaseInfo failed,outputDirId:{}",outputDirId); throw new RuntimeException("上一节点信息查询失败"); } FileMetadataInfoResp fileMetadataInfoResp = fileBaseInfoResp.getData(); String objectKey = fileMetadataInfoResp.getObjectKey(); - // 本地文件路径 taskLocalBaseDir - String localSaveDir = simulationBaseDir + objectKey; - CoreLogger.info("beforeNode localSaveDir:{}",localSaveDir); - // 获取所有文件名字 - // 符合正则的从文件 - AtomicReference masterFilePath = new AtomicReference<>(); - // 符合正则的主文件 - List inputFilePaths=new ArrayList<>(); - FilesUtil.collectFiles(localSaveDir,masterFileRegularStr,inputFilesRegularStr,masterFilePath,inputFilePaths); - try { - MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilePath.get()); - submitHpcTaskRemoteReq.setMasterFile(masterMultipartFile); - if(CollectionUtils.isNotEmpty(inputFilePaths)){ - List inputFiles = new ArrayList<>(); - for (String inputFilePath : inputFilePaths) { - MultipartFile inputFile = FilesUtil.toMultipartFile(inputFilePath); - inputFiles.add(inputFile); - } - submitHpcTaskRemoteReq.setInputFiles(inputFiles); - } - } catch (Exception e) { - log.error("dealHpcFile error",e); - throw new RuntimeException("求解文件处理失败"); - } - - + return objectKey; } /** diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java new file mode 100644 index 00000000..0501d1d8 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java @@ -0,0 +1,71 @@ +package com.sdm.pbs.controller; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; +import com.sdm.common.feign.inter.pbs.ITaskFeignClient; +import com.sdm.common.utils.FilesUtil; +import com.sdm.pbs.model.req.SubmitHpcTaskReq; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.multipart.MultipartFile; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +@RestController +@RequestMapping("/pbs") +@Tag(name = "HPC调度", description = "与hpc交互的前置增强接口") +public class TaskAdapter implements ITaskFeignClient { + + @Autowired + private TaskController taskController; + + @PostMapping(value = "/adapterSubmitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @Operation(summary = "作业提交") + public SdmResponse adapterSubmitHpcJob(SubmitHpcTaskRemoteReq req) { + // spdm 回传路径 + // 求解文件获取,可以在这一层分片上传,然后拿到对应的路径 + getSimulationFile(req); + SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); + BeanUtils.copyProperties(req,submitHpcTaskReq); + return taskController.submitHpcJob(submitHpcTaskReq); + } + + private void getSimulationFile(SubmitHpcTaskRemoteReq req){ + String simulationFileLocalPath = req.getSimulationFileLocalPath(); + String masterFileRegularStr = req.getMasterFileRegularStr(); + String inputFilesRegularStr = req.getInputFilesRegularStr(); + // 获取所有文件名字 + // 符合正则的从文件 + AtomicReference masterFilePath = new AtomicReference<>(); + // 符合正则的主文件 + List inputFilePaths=new ArrayList<>(); + FilesUtil.collectFiles(simulationFileLocalPath,masterFileRegularStr,inputFilesRegularStr,masterFilePath,inputFilePaths); + try { + MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilePath.get()); + req.setMasterFile(masterMultipartFile); + if(CollectionUtils.isNotEmpty(inputFilePaths)){ + List inputFiles = new ArrayList<>(); + for (String inputFilePath : inputFilePaths) { + MultipartFile inputFile = FilesUtil.toMultipartFile(inputFilePath); + inputFiles.add(inputFile); + } + req.setInputFiles(inputFiles); + } + } catch (Exception e) { + log.error("getSimulationFile error",e); + throw new RuntimeException("求解文件处理失败"); + } + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java index a6995204..f6bd0a8b 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java @@ -3,7 +3,6 @@ package com.sdm.pbs.controller; import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.req.pbs.HpcTaskFileDownReq; import com.sdm.common.entity.req.pbs.HpcTaskFileReq; -import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.entity.req.pbs.hpc.*; import com.sdm.common.entity.resp.PageDataResp; import com.sdm.common.entity.resp.pbs.hpc.*; @@ -11,7 +10,6 @@ import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobResp; import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasksResp; import com.sdm.common.entity.resp.pbs.hpc.nodecore.NodeListCoreResp; import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeListResp; -import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.utils.HpcCommandExcuteUtil; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; @@ -26,7 +24,6 @@ import com.sdm.pbs.service.impl.PbsServiceDecorator; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; @@ -41,7 +38,7 @@ import java.util.Map; @RestController @RequestMapping("/pbs") @Tag(name = "HPC调度", description = "与hpc交互的接口") -public class TaskController implements ITaskFeignClient { +public class TaskController { @Autowired private HpcInstructionService hpcInstructionService; @@ -75,10 +72,8 @@ public class TaskController implements ITaskFeignClient { @PostMapping(value = "/submitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) @Operation(summary = "作业提交") - public SdmResponse submitHpcJob(SubmitHpcTaskRemoteReq req) { - SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq(); - BeanUtils.copyProperties(req,submitHpcTaskReq); - return pbsService.submitHpcJob(submitHpcTaskReq); + public SdmResponse submitHpcJob(SubmitHpcTaskReq req) { + return pbsService.submitHpcJob(req); } @GetMapping("/stopHpcJob") diff --git a/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java b/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java index 148877d5..79074c68 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java +++ b/pbs/src/main/java/com/sdm/pbs/model/entity/SimulationJob.java @@ -95,9 +95,13 @@ public class SimulationJob implements Serializable { @TableField("stdoutHpcFilePath") private String stdoutHpcFilePath; - @Schema(description = "记录在minio上?,任务执行输出的文件在Hpc的绝对路径,baseDir+jobName(文件回传)+uuid(文件回传),下面可能有多个文件") - @TableField("stdoutSpdmFilePath") - private String stdoutSpdmFilePath; + @Schema(description = "minio上,任务执行输出的文件在Hpc的绝对路径,baseDir+jobName(文件回传)+uuid(文件回传),下面可能有多个文件") + @TableField("stdoutSpdmMinoFilePath") + private String stdoutSpdmMinoFilePath; + + @Schema(description = "本地nas,任务执行输出的文件在Hpc的绝对路径,baseDir+jobName(文件回传)+uuid(文件回传),下面可能有多个文件") + @TableField("stdoutSpdmNasFilePath") + private String stdoutSpdmNasFilePath; @Schema(description = "节点名称") @TableField("nodeName") diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java b/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java index 71af7711..614003a2 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java +++ b/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java @@ -66,6 +66,17 @@ public class SubmitHpcTaskReq { @Schema(description = "软件的id") public Long softwareId; + @Schema(description = "计算任务回传minio的路径") + public String stdoutSpdmMinoFilePath; + + @Schema(description = "计算任务回传本地Nas的路径") + public String stdoutSpdmNasFilePath; + + @Schema(description = "主文件本地路径,spdm工作流引擎会传递过来") + public String masterFileLocalPath; + + @Schema(description = "求解文件本地路径,spdm工作流引擎会传递过来") + public List inputFileLocalPaths = new ArrayList<>(); } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index 4b99c6ed..a07a99bc 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -120,7 +120,8 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { simulationJob.setJobDetailId("todo"); // 文件路径 todo 共享目录+jobName(文件回传)+uuid,下面可能有多个文件 simulationJob.setStdoutHpcFilePath(hpcOutPutDir); - simulationJob.setStdoutSpdmFilePath("/minio/base/job001/uuid-123"); + simulationJob.setStdoutSpdmMinoFilePath(req.getStdoutSpdmMinoFilePath()); + simulationJob.setStdoutSpdmNasFilePath(req.getStdoutSpdmNasFilePath()); // todo 执行信息 定时任务回传的时候修改 simulationJob.setNodeName("todo"); simulationJob.setExecutCommand("ansys -b -input input.dat -output output.log");