修改:HPC优化

This commit is contained in:
yangyang01000846
2025-12-05 18:09:41 +08:00
parent 9b48988af8
commit 220a126a7b
10 changed files with 160 additions and 59 deletions

View File

@@ -9,7 +9,6 @@ import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.FilesUtil;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.entity.ProcessNodeParam;
import com.sdm.flowable.service.IAsyncTaskRecordService;
@@ -19,13 +18,14 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
// HPC(executeType=HPC)
@Slf4j
@@ -53,8 +53,11 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
CoreLogger.info("hpc process excute,params:{},config:{}",JSONObject.toJSONString(params),JSONObject.toJSONString(config));
SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq = convertParamsToReq(params);
String beforeNodeId = config.getBeforeNodeId();
String currentNodeId =execution.getCurrentActivityId();
String masterFileRegularStr = config.getMasterFileRegularStr();
String inputFilesRegularStr = config.getInputFilesRegularStr();
CoreLogger.info("beforeNodeId:{},currentNodeId:{},masterFileRegularStr:{},inputFilesRegularStr:{}",beforeNodeId,currentNodeId,masterFileRegularStr,inputFilesRegularStr);
// params 取只是测试使用
String processDefinitionId = (execution==null||StringUtils.isBlank(execution.getProcessDefinitionId()))?
params.get("processDefinitionId").toString():execution.getProcessDefinitionId();
@@ -62,15 +65,17 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
String processInstanceId = (execution==null||StringUtils.isBlank(execution.getProcessInstanceId()))?
params.get("processInstanceId").toString():execution.getProcessInstanceId();
// hpc文件处理
dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,masterFileRegularStr,inputFilesRegularStr,
processDefinitionId,processInstanceId);
submitHpcTaskRemoteReq.setMasterFileRegularStr(masterFileRegularStr);
submitHpcTaskRemoteReq.setInputFilesRegularStr(inputFilesRegularStr);
// hpc文件路径处理
dealHpcFile(submitHpcTaskRemoteReq,beforeNodeId,currentNodeId, processDefinitionId,processInstanceId);
// 实现HPC处理逻辑...
// INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败)
String status = "INIT";
// 1. 调用 HPC 平台提交任务
SdmResponse<String> submitResp = taskFeignClient.submitHpcJob(submitHpcTaskRemoteReq);
SdmResponse<String> submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",params);
status = "FAIL";
@@ -90,15 +95,38 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
log.info("HPC 任务 {} 已提交", "hpcTaskId");
}
private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String masterFileRegularStr,
String inputFilesRegularStr,String processDefinitionId,String processInstanceId) {
private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String currentNodeId,
String processDefinitionId, String processInstanceId) {
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
// 查询前节点的工作目录---》本地磁盘对应目录
ProcessNodeParam processNodeParam = processNodeParamService.lambdaQuery().
eq(ProcessNodeParam::getNodeId, beforeNodeId).
eq(ProcessNodeParam::getProcessDefinitionId,processDefinitionId).
eq(ProcessNodeParam::getProcessInstanceId,processInstanceId).
one();
// 查询前节点和当前节点的工作目录---》本地磁盘对应目录
List<ProcessNodeParam> processNodeParams = processNodeParamService.lambdaQuery()
.in(ProcessNodeParam::getNodeId, beforeNodeId, currentNodeId) // 使用 in 条件,匹配 beforeNodeId 或 currentNodeId
.eq(ProcessNodeParam::getProcessDefinitionId, processDefinitionId)
.eq(ProcessNodeParam::getProcessInstanceId, processInstanceId)
.orderByDesc(ProcessNodeParam::getUpdateTime)
.list();
Map<String, List<ProcessNodeParam>> nodeParamMap = processNodeParams.stream()
.collect(Collectors.groupingBy(ProcessNodeParam::getNodeId));
List<ProcessNodeParam> beforeNodeParams = nodeParamMap.get(beforeNodeId);
List<ProcessNodeParam> currentNodeParams = nodeParamMap.get(currentNodeId);
if(CollectionUtils.isEmpty(beforeNodeParams) || CollectionUtils.isEmpty(currentNodeParams)){
throw new RuntimeException("未获取到当前节点或者求解文件节点信息");
}
ProcessNodeParam beforeNode = beforeNodeParams.get(0);
ProcessNodeParam currentNode = currentNodeParams.get(0);
String beforeNodeJectKey = getNodeObjectKey(beforeNode);
String currentNodeJectKey = getNodeObjectKey(currentNode);
// 本地求解文件路径 taskLocalBaseDir
submitHpcTaskRemoteReq.setSimulationFileLocalPath(simulationBaseDir + beforeNodeJectKey);
// hpc 回传文件路径
submitHpcTaskRemoteReq.setStdoutSpdmNasFilePath(simulationBaseDir + currentNodeJectKey);
CoreLogger.info("localSaveDir :{} ,{}",simulationBaseDir + beforeNodeJectKey,simulationBaseDir + currentNodeJectKey);
}
private String getNodeObjectKey(ProcessNodeParam processNodeParam){
String paramJson = processNodeParam.getParamJson();
JSONObject paramJsonObject = JSONObject.parseObject(paramJson);
// outputDirId
@@ -108,36 +136,12 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
getFileBaseInfoReq.setFileId(outputDirId);
SdmResponse<FileMetadataInfoResp> fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq);
if(!fileBaseInfoResp.isSuccess()||fileBaseInfoResp.getData()==null){
CoreLogger.warn("getFileBaseInfo failed,outputDirId:{}",outputDirId);
throw new RuntimeException("上一节点信息查询失败");
}
FileMetadataInfoResp fileMetadataInfoResp = fileBaseInfoResp.getData();
String objectKey = fileMetadataInfoResp.getObjectKey();
// 本地文件路径 taskLocalBaseDir
String localSaveDir = simulationBaseDir + objectKey;
CoreLogger.info("beforeNode localSaveDir:{}",localSaveDir);
// 获取所有文件名字
// 符合正则的从文件
AtomicReference<String> masterFilePath = new AtomicReference<>();
// 符合正则的主文件
List<String> inputFilePaths=new ArrayList<>();
FilesUtil.collectFiles(localSaveDir,masterFileRegularStr,inputFilesRegularStr,masterFilePath,inputFilePaths);
try {
MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilePath.get());
submitHpcTaskRemoteReq.setMasterFile(masterMultipartFile);
if(CollectionUtils.isNotEmpty(inputFilePaths)){
List<MultipartFile> inputFiles = new ArrayList<>();
for (String inputFilePath : inputFilePaths) {
MultipartFile inputFile = FilesUtil.toMultipartFile(inputFilePath);
inputFiles.add(inputFile);
}
submitHpcTaskRemoteReq.setInputFiles(inputFiles);
}
} catch (Exception e) {
log.error("dealHpcFile error",e);
throw new RuntimeException("求解文件处理失败");
}
return objectKey;
}
/**