修改:hpc提交任务求解文件增加手动上传文件的模式.

This commit is contained in:
yangyang01000846
2025-12-25 23:01:26 +08:00
parent 9e85144f1e
commit 4c4a7f18df
4 changed files with 83 additions and 27 deletions

View File

@@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class SubmitHpcTaskRemoteReq implements Serializable {
@@ -51,7 +52,10 @@ public class SubmitHpcTaskRemoteReq implements Serializable {
@Schema(description = "任务回传本地Nas的路径")
public String stdoutSpdmNasFilePath;
@Schema(description = "任务求解文件本地路径,spdm工作流引擎会传递过来adapter里使用")
@Schema(description = "获取求解文件的方式AUTO,自动或者手动MANUAL")
public String executeMode;
@Schema(description = "任务求解文件本地路径,spdm工作流引擎会传递过来adapter里使用,executeMode=AUTO")
public String simulationFileLocalPath;
@Schema(description = "任务求解文件主文件正则,spdm工作流引擎会传递过来adapter里使用")
@@ -60,5 +64,10 @@ public class SubmitHpcTaskRemoteReq implements Serializable {
@Schema(description = "任务求解文件从文件正则,spdm工作流引擎会传递过来adapter里使用")
public String inputFilesRegularStr;
@Schema(description = "任务求解文件主文件路径adapter里使用,executeMode=MANUAL")
public List<String> manualMasterFilepaths;
@Schema(description = "任务求解文件从文件路径adapter里使用,executeMode=MANUAL")
public List<String> manualInputFilePaths;
}

View File

@@ -23,7 +23,7 @@ public class TaskClientFeignClientImpl implements ITaskFeignClient {
return response;
} catch (Exception e) {
CoreLogger.error("SubmitHpcJob Exception:{}", e.getMessage());
return SdmResponse.failed("Hpc任务提交失败");
return SdmResponse.failed("Hpc任务提交失败:"+e.getMessage());
}
}

View File

@@ -22,10 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
// HPC(executeType=HPC)
@@ -68,10 +65,15 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
submitHpcTaskRemoteReq.setMasterFileRegularStr(masterFileRegularStr);
submitHpcTaskRemoteReq.setInputFilesRegularStr(inputFilesRegularStr);
// hpc文件路径处理
dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId);
CoreLogger.info("hpc executeMode:{}",params.get("executeMode"));
String executeMode = params.get("executeMode").toString();
if(StringUtils.isBlank(executeMode)||
(!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)&&
!Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL))){
throw new RuntimeException("hpc executeMode illegal");
}
// 处理hpc求解文件路径
dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId,executeMode,params);
// 实现HPC处理逻辑...
// INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败)
String status = AsyncTaskStatusEnum.INIT.getCode();
@@ -79,7 +81,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
SdmResponse<String> submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",params);
throw new RuntimeException("HpcHandler submit failed");
throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage());
}
String hpcTaskId = submitResp.getData();
CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId);
@@ -97,7 +99,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
}
private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId,
String processDefinitionId, String processInstanceId) {
String processDefinitionId, String processInstanceId, String executeMode,Map<String, Object> params) {
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
// 查询前节点的工作目录---》本地磁盘对应目录
// 查询前节点和当前节点的工作目录---》本地磁盘对应目录
@@ -115,15 +117,49 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){
throw new RuntimeException("未获取到当前节点或者求解文件节点信息");
}
ProcessNodeParam beforeNode = beforeNodeParams.get(0);
// 自动,前一个节点
submitHpcTaskRemoteReq.setExecuteMode(executeMode);
if(Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_AUTO)){
ProcessNodeParam beforeNode = beforeNodeParams.get(0);
String beforeNodeJectKey = getNodeObjectKey(beforeNode);
// 本地求解文件路径 taskLocalBaseDir
submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey);
CoreLogger.info("simulationFileLocalPath :{} ",simulationBaseDir + beforeNodeJectKey);
}
// 手动上传的
if (Objects.equals(executeMode,FlowableConfig.EXECUTE_MODE_MANUAL)) {
List<String> masterFilePaths = getFileListFromMap(params, "masterFileRegularStr");
List<String> inPutFilePaths = getFileListFromMap(params, "inputFilesRegularStr");
if(CollectionUtils.isEmpty(masterFilePaths)||CollectionUtils.isEmpty(inPutFilePaths)){
CoreLogger.warn("hpc executeMode manual,filepath illegal");
throw new RuntimeException("hpc executeMode manual,filepath illegal");
}
submitHpcTaskRemoteReq.setManualMasterFilepaths(masterFilePaths);
submitHpcTaskRemoteReq.setManualInputFilePaths(inPutFilePaths);
}
// hpc 节点回传路径
ProcessNodeParam currentNode = currentNodeParams.get(0);
String beforeNodeJectKey = getNodeObjectKey(beforeNode);
String currentNodeJectKey = getNodeObjectKey(currentNode);
// 本地求解文件路径 taskLocalBaseDir
submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey);
// hpc 回传文件路径
submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey);
CoreLogger.info("localSaveDir :{} ,{}",simulationBaseDir + beforeNodeJectKey,simulationBaseDir + currentNodeJectKey);
CoreLogger.info("stdoutSpdmNasFilePath :{} ",simulationBaseDir + currentNodeJectKey);
}
/**
* 通用方法:从 Map 中提取 explicitInputFiles 下的指定文件列表
* @param dataMap 根 Map
* @param key 要提取的列表名称masterFileRegularStr/inputFilesRegularStr
* @return 字符串列表,不存在则返回空列表(避免 NPE
*/
@SuppressWarnings("unchecked")
private static List<String> getFileListFromMap(Map<String, Object> dataMap, String key) {
return Optional.ofNullable(dataMap)
// 提取 explicitInputFiles 子 Map
.map(map -> (Map<String, Object>) map.get("explicitInputFiles"))
// 提取指定 key 的列表
.map(explicitMap -> (List<String>) explicitMap.get(key))
// 为空则返回空列表,避免后续遍历 NPE
.orElse(List.of());
}

View File

@@ -20,6 +20,7 @@ import org.springframework.web.multipart.MultipartFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@@ -28,6 +29,9 @@ import java.util.concurrent.atomic.AtomicReference;
@Tag(name = "HPC调度", description = "与hpc交互的前置增强接口")
public class TaskAdapter implements ITaskFeignClient {
private static final String EXECUTE_MODE_AUTO = "AUTO";
private static final String EXECUTE_MODE_MANUAL = "MANUAL";
@Autowired
private TaskController taskController;
@@ -44,17 +48,24 @@ public class TaskAdapter implements ITaskFeignClient {
private void getSimulationFile(SubmitHpcTaskRemoteReq req,SubmitHpcTaskReq submitHpcTaskReq ){
log.info("提交请求参数:{}", JSONObject.toJSONString(req));
String simulationFileLocalPath = req.getSimulationFileLocalPath();
String masterFileRegularStr = req.getMasterFileRegularStr();
String inputFilesRegularStr = req.getInputFilesRegularStr();
// 获取所有文件名字
// 符合正则的从文件
AtomicReference<String> masterFilePath = new AtomicReference<>();
// 符合正则的主文件
String masterFilepath="";
List<String> inputFilePaths=new ArrayList<>();
FilesUtil.collectFiles(simulationFileLocalPath,masterFileRegularStr,inputFilesRegularStr,masterFilePath,inputFilePaths);
// 手动
if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_MANUAL)) {
masterFilepath = req.getManualMasterFilepaths().get(0);
inputFilePaths= req.getManualInputFilePaths();
}
// 自动
if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_AUTO)) {
String simulationFileLocalPath = req.getSimulationFileLocalPath();
String masterFileRegularStr = req.getMasterFileRegularStr();
String inputFilesRegularStr = req.getInputFilesRegularStr();
AtomicReference<String> masterFilePathAtomic = new AtomicReference<>();
FilesUtil.collectFiles(simulationFileLocalPath,masterFileRegularStr,inputFilesRegularStr,masterFilePathAtomic,inputFilePaths);
masterFilepath=masterFilePathAtomic.get();
}
try {
MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilePath.get());
MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilepath);
submitHpcTaskReq.setMasterFile(masterMultipartFile);
if(CollectionUtils.isNotEmpty(inputFilePaths)){
List<MultipartFile> inputFiles = new ArrayList<>();
@@ -66,7 +77,7 @@ public class TaskAdapter implements ITaskFeignClient {
}
} catch (Exception e) {
log.error("getSimulationFile error",e);
throw new RuntimeException("求解文件处理失败");
throw new RuntimeException("求解文件处理失败"+e.getMessage());
}
}