From 4c4a7f18dfc16e453566dd9970b120ecc95194bb Mon Sep 17 00:00:00 2001 From: yangyang01000846 <15195822163@163.com> Date: Thu, 25 Dec 2025 23:01:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Ahpc=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=E4=BB=BB=E5=8A=A1=E6=B1=82=E8=A7=A3=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=89=8B=E5=8A=A8=E4=B8=8A=E4=BC=A0=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=9A=84=E6=A8=A1=E5=BC=8F.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../req/pbs/SubmitHpcTaskRemoteReq.java | 11 +++- .../impl/pbs/TaskClientFeignClientImpl.java | 2 +- .../flowable/delegate/handler/HpcHandler.java | 66 ++++++++++++++----- .../com/sdm/pbs/controller/TaskAdapter.java | 31 ++++++--- 4 files changed, 83 insertions(+), 27 deletions(-) diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java index 450a23b9..cd145345 100644 --- a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java @@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import java.io.Serializable; +import java.util.List; @Data public class SubmitHpcTaskRemoteReq implements Serializable { @@ -51,7 +52,10 @@ public class SubmitHpcTaskRemoteReq implements Serializable { @Schema(description = "任务回传本地Nas的路径") public String stdoutSpdmNasFilePath; - @Schema(description = "任务求解文件本地路径,spdm工作流引擎会传递过来,adapter里使用") + @Schema(description = "获取求解文件的方式AUTO,自动或者手动MANUAL") + public String executeMode; + + @Schema(description = "任务求解文件本地路径,spdm工作流引擎会传递过来,adapter里使用,executeMode=AUTO") public String simulationFileLocalPath; @Schema(description = "任务求解文件主文件正则,spdm工作流引擎会传递过来,adapter里使用") @@ -60,5 +64,10 @@ public class SubmitHpcTaskRemoteReq implements Serializable { @Schema(description = "任务求解文件从文件正则,spdm工作流引擎会传递过来,adapter里使用") public String inputFilesRegularStr; + @Schema(description = "任务求解文件主文件路径,adapter里使用,executeMode=MANUAL") + public List manualMasterFilepaths; + + @Schema(description = "任务求解文件从文件路径,adapter里使用,executeMode=MANUAL") + public List manualInputFilePaths; } diff --git a/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java b/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java index f4c5cf28..df784ed7 100644 --- a/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java +++ b/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java @@ -23,7 +23,7 @@ public class TaskClientFeignClientImpl implements ITaskFeignClient { return response; } catch (Exception e) { CoreLogger.error("SubmitHpcJob Exception:{}", e.getMessage()); - return SdmResponse.failed("Hpc任务提交失败"); + return SdmResponse.failed("Hpc任务提交失败:"+e.getMessage()); } } diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index 4e7e0df1..6b3e8a96 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -22,10 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; // HPC(executeType=HPC) @@ -68,10 +65,15 @@ public class HpcHandler implements ExecutionHandler,HPCExecu submitHpcTaskRemoteReq.setMasterFileRegularStr(masterFileRegularStr); submitHpcTaskRemoteReq.setInputFilesRegularStr(inputFilesRegularStr); - - // hpc文件路径处理 - dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId); - + CoreLogger.info("hpc executeMode:{}",params.get("executeMode")); + String executeMode = params.get("executeMode").toString(); + if(StringUtils.isBlank(executeMode)|| + (!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&& + !Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL))){ + throw new RuntimeException("hpc executeMode illegal"); + } + // 处理hpc求解文件路径 + dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId,executeMode,params); // 实现HPC处理逻辑... // INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败) String status = AsyncTaskStatusEnum.INIT.getCode(); @@ -79,7 +81,7 @@ public class HpcHandler implements ExecutionHandler,HPCExecu SdmResponse submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq); if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ log.error("HpcHandler submit failed,jobName:{}",params); - throw new RuntimeException("HpcHandler submit failed"); + throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage()); } String hpcTaskId = submitResp.getData(); CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId); @@ -97,7 +99,7 @@ public class HpcHandler implements ExecutionHandler,HPCExecu } private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId, - String processDefinitionId, String processInstanceId) { + String processDefinitionId, String processInstanceId, String executeMode,Map params) { String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR; // 查询前节点的工作目录---》本地磁盘对应目录 // 查询前节点和当前节点的工作目录---》本地磁盘对应目录 @@ -115,15 +117,49 @@ public class HpcHandler implements ExecutionHandler,HPCExecu if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){ throw new RuntimeException("未获取到当前节点或者求解文件节点信息"); } - ProcessNodeParam beforeNode = beforeNodeParams.get(0); + // 自动,前一个节点 + submitHpcTaskRemoteReq.setExecuteMode(executeMode); + if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)){ + ProcessNodeParam beforeNode = beforeNodeParams.get(0); + String beforeNodeJectKey = getNodeObjectKey(beforeNode); + // 本地求解文件路径 taskLocalBaseDir + submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey); + CoreLogger.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey); + } + // 手动上传的 + if (Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)) { + List masterFilePaths = getFileListFromMap(params, "masterFileRegularStr"); + List inPutFilePaths = getFileListFromMap(params, "inputFilesRegularStr"); + if(CollectionUtils.isEmpty(masterFilePaths)||CollectionUtils.isEmpty(inPutFilePaths)){ + CoreLogger.warn("hpc executeMode manual,filepath illegal"); + throw new RuntimeException("hpc executeMode manual,filepath illegal"); + } + submitHpcTaskRemoteReq.setManualMasterFilepaths(masterFilePaths); + submitHpcTaskRemoteReq.setManualInputFilePaths(inPutFilePaths); + } + // hpc 节点回传路径 ProcessNodeParam currentNode = currentNodeParams.get(0); - String beforeNodeJectKey = getNodeObjectKey(beforeNode); String currentNodeJectKey = getNodeObjectKey(currentNode); - // 本地求解文件路径 taskLocalBaseDir - submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey); // hpc 回传文件路径 submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey); - CoreLogger.info("localSaveDir :{} ,{}",simulationBaseDir + beforeNodeJectKey,simulationBaseDir + currentNodeJectKey); + CoreLogger.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey); + } + + /** + * 通用方法:从 Map 中提取 explicitInputFiles 下的指定文件列表 + * @param dataMap 根 Map + * @param key 要提取的列表名称(masterFileRegularStr/inputFilesRegularStr) + * @return 字符串列表,不存在则返回空列表(避免 NPE) + */ + @SuppressWarnings("unchecked") + private static List getFileListFromMap(Map dataMap, String key) { + return Optional.ofNullable(dataMap) + // 提取 explicitInputFiles 子 Map + .map(map -> (Map) map.get("explicitInputFiles")) + // 提取指定 key 的列表 + .map(explicitMap -> (List) explicitMap.get(key)) + // 为空则返回空列表,避免后续遍历 NPE + .orElse(List.of()); } diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java index 1898d0bb..52a61a66 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java @@ -20,6 +20,7 @@ import org.springframework.web.multipart.MultipartFile; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @Slf4j @@ -28,6 +29,9 @@ import java.util.concurrent.atomic.AtomicReference; @Tag(name = "HPC调度", description = "与hpc交互的前置增强接口") public class TaskAdapter implements ITaskFeignClient { + private static final String EXECUTE_MODE_AUTO = "AUTO"; + private static final String EXECUTE_MODE_MANUAL = "MANUAL"; + @Autowired private TaskController taskController; @@ -44,17 +48,24 @@ public class TaskAdapter implements ITaskFeignClient { private void getSimulationFile(SubmitHpcTaskRemoteReq req,SubmitHpcTaskReq submitHpcTaskReq ){ log.info("提交请求参数:{}", JSONObject.toJSONString(req)); - String simulationFileLocalPath = req.getSimulationFileLocalPath(); - String masterFileRegularStr = req.getMasterFileRegularStr(); - String inputFilesRegularStr = req.getInputFilesRegularStr(); - // 获取所有文件名字 - // 符合正则的从文件 - AtomicReference masterFilePath = new AtomicReference<>(); - // 符合正则的主文件 + String masterFilepath=""; List inputFilePaths=new ArrayList<>(); - FilesUtil.collectFiles(simulationFileLocalPath,masterFileRegularStr,inputFilesRegularStr,masterFilePath,inputFilePaths); + // 手动 + if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_MANUAL)) { + masterFilepath = req.getManualMasterFilepaths().get(0); + inputFilePaths= req.getManualInputFilePaths(); + } + // 自动 + if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_AUTO)) { + String simulationFileLocalPath = req.getSimulationFileLocalPath(); + String masterFileRegularStr = req.getMasterFileRegularStr(); + String inputFilesRegularStr = req.getInputFilesRegularStr(); + AtomicReference masterFilePathAtomic = new AtomicReference<>(); + FilesUtil.collectFiles(simulationFileLocalPath,masterFileRegularStr,inputFilesRegularStr,masterFilePathAtomic,inputFilePaths); + masterFilepath=masterFilePathAtomic.get(); + } try { - MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilePath.get()); + MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilepath); submitHpcTaskReq.setMasterFile(masterMultipartFile); if(CollectionUtils.isNotEmpty(inputFilePaths)){ List inputFiles = new ArrayList<>(); @@ -66,7 +77,7 @@ public class TaskAdapter implements ITaskFeignClient { } } catch (Exception e) { log.error("getSimulationFile error",e); - throw new RuntimeException("求解文件处理失败"); + throw new RuntimeException("求解文件处理失败"+e.getMessage()); } }