修改:hpc求解文件分片上传;文件按照用户id区分;求解器文件路径空格问题;输出文件out绑定jobId;comsol结果文件处理;abaqus求解文件处理

This commit is contained in:
yangyang01000846
2026-01-13 16:14:27 +08:00
parent 195cd9abf3
commit 8a66b74a7b
9 changed files with 264 additions and 50 deletions

View File

@@ -58,8 +58,7 @@ public class HpcCommandExcuteUtil {
@Autowired
private HttpClientUtil httpClientUtil;
@Autowired
private WebClient webClient;
public String excuteCmd(String command) {
if(Objects.equals(hpcExcuteWay,"remote")){
@@ -144,7 +143,7 @@ public class HpcCommandExcuteUtil {
return nodeInfos;
}
public ResponseEntity<StreamingResponseBody> hpcDownloadFile(String path, Long fileSize) {
public ResponseEntity<StreamingResponseBody> hpcDownloadFile(String path, Long fileSize,WebClient pbsWebClient) {
String fileName = extractFileName(path);
String encodedFileName = URLEncoder.encode(fileName, StandardCharsets.UTF_8);
@@ -154,7 +153,7 @@ public class HpcCommandExcuteUtil {
// 调用 B 服务并流式写出
DataBufferUtils.write(
webClient.get()
pbsWebClient.get()
.uri(url)
.retrieve()
.bodyToFlux(DataBuffer.class),
@@ -209,7 +208,7 @@ public class HpcCommandExcuteUtil {
}
// 调用工具上传hpc文件
public String uploaHpcFile(MultipartFile file, String subDir) {
public String uploaHpcFile(MultipartFile file, String subDir,WebClient pbsWebClient) {
try {
// 3. Multipart body 构建
MultipartBodyBuilder builder = new MultipartBodyBuilder();
@@ -221,7 +220,7 @@ public class HpcCommandExcuteUtil {
});
builder.part("subDir", subDir);
// 4. 调用 B 服务上传接口
String uploadResult = webClient.post()
String uploadResult = pbsWebClient.post()
.uri(remoteUploadFileUrl)
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(builder.build()))
@@ -230,7 +229,7 @@ public class HpcCommandExcuteUtil {
.block();
return uploadResult;
} catch (Exception e) {
System.out.println("上传失败");
log.error("Hpc求解文件上传失败{}",e.getMessage());
return "";
}
}

View File

@@ -1,4 +1,4 @@
package com.sdm.common.config;
package com.sdm.pbs.config.webclient;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
@@ -51,8 +51,7 @@ public class WebClientConfig {
@Bean
public WebClient webClient() {
public WebClient pbsWebClient() {
// 1. 连接池配置
ConnectionProvider connectionProvider = ConnectionProvider.builder("webclient-pool")
.maxConnections(maxConnections) // 最大连接

View File

@@ -72,7 +72,11 @@ public class TaskAdapter implements ITaskFeignClient {
// 手动
if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_MANUAL)) {
masterFilepath = req.getManualMasterFilepaths().get(0);
// 本地主文件
submitHpcTaskReq.setMasterFilePath(masterFilepath);
inputFilePaths= req.getManualInputFilePaths();
// 本地从文件
submitHpcTaskReq.setInputFilePaths(inputFilePaths);
}
// 自动
if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_AUTO)) {
@@ -82,17 +86,22 @@ public class TaskAdapter implements ITaskFeignClient {
AtomicReference<String> masterFilePathAtomic = new AtomicReference<>();
FilesUtil.collectFiles(simulationFileLocalPath,masterFileRegularStr,inputFilesRegularStr,masterFilePathAtomic,inputFilePaths);
masterFilepath=masterFilePathAtomic.get();
// 本地主文件
submitHpcTaskReq.setMasterFilePath(masterFilepath);
}
try {
MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilepath);
submitHpcTaskReq.setMasterFile(masterMultipartFile);
// MultipartFile masterMultipartFile = FilesUtil.toMultipartFile(masterFilepath);
// submitHpcTaskReq.setMasterFile(masterMultipartFile);
if(CollectionUtils.isNotEmpty(inputFilePaths)){
List<MultipartFile> inputFiles = new ArrayList<>();
for (String inputFilePath : inputFilePaths) {
MultipartFile inputFile = FilesUtil.toMultipartFile(inputFilePath);
inputFiles.add(inputFile);
}
submitHpcTaskReq.setInputFiles(inputFiles);
// 本地从文件
submitHpcTaskReq.setInputFilePaths(inputFilePaths);
// List<MultipartFile> inputFiles = new ArrayList<>();
// for (String inputFilePath : inputFilePaths) {
// MultipartFile inputFile = FilesUtil.toMultipartFile(inputFilePath);
// inputFiles.add(inputFile);
// }
// submitHpcTaskReq.setInputFiles(inputFiles);
}
} catch (Exception e) {
log.error("getSimulationFile error",e);

View File

@@ -83,4 +83,13 @@ public class SubmitHpcTaskReq {
@Schema(description = "任务流用户传递的参数,用于动态替换命令")
private Map<String,Object> params;
@Schema(description = "hpc文件名有格式用于动态替换命令")
private String masterFileName;
@Schema(description = "hpc文件名没有格式用于动态替换命令")
private String masterFileNameNoFormat;
@Schema(description = "hpc结果输出文件comsol文件定制如原来是123_dianxin.mph --》123_dianxin_result.mph用于动态替换命令")
private String comsolResultName;
}

View File

@@ -24,10 +24,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import java.io.File;
@@ -54,6 +56,10 @@ public class HpcInstructionServiceImpl implements HpcInstructionService {
@Autowired
private HpcCommandExcuteUtil hpcCommandExcuteUtil;
@Autowired
@Qualifier("pbsWebClient")
private WebClient pbsWebClient;
// 8MB 每次映射
private static final long MAP_SIZE = 10 * 1024 * 1024;
@@ -127,6 +133,8 @@ public class HpcInstructionServiceImpl implements HpcInstructionService {
return SdmResponse.failed(workDirPair.getRight(),addJobResp);
}
addJobParam.setWorkdir(targetWorkDir);
// 标准输出文件重命名 jobId_stdout.out
addJobParam.setStdout(req.getJobId()+"_"+req.getStdout());
String addJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, addJobParam, req.getCommand());
String result = hpcCommandExcuteUtil.excuteCmd(addJobCommand);
AddJobResp addJobResp = HpcCommandResulParseUtil.parseJoAddResult(result);
@@ -477,7 +485,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService {
@Override
public ResponseEntity<StreamingResponseBody> hpcDownloadFile(String fileName,Long fileSize) {
return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize);
return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize,pbsWebClient);
}
@Override

View File

@@ -32,19 +32,26 @@ import com.sdm.pbs.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -87,6 +94,26 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
@Autowired
private MessageFeignClientImpl messageFeignClient;
@Autowired
@Qualifier("pbsWebClient")
private WebClient pbsWebClient;
// 1. 使用@Value注入配置项对应配置文件中的key
@Value("${hpc.fileToHpc.http.scheme:}")
private String fileToHpcScheme; // 协议
@Value("${hpc.fileToHpc.http.host:}")
private String fileToHpcHost; // 主机
@Value("${hpc.fileToHpc.http.port:}")
private Integer fileToHpcPort; // 端口
@Value("${hpc.fileToHpc.http.path:}")
private String fileToHpcPath; // 接口路径
@Value("${hpc.fileToHpc.http.mergePath:}")
private String fileToHpcMergePath;
@Override
public SdmResponse<HpcResouceInfo> queryHpcResource() {
return pbsService.queryHpcResource();
@@ -94,18 +121,24 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
@Override
public SdmResponse<String> submitHpcJob(SubmitHpcTaskReq req) {
//1. 上传hpc主文件 及 其他文件
MultipartFile masterFile = req.getMasterFile();
// 获取hpc的工作目录
String subDir = generateHpcSubDir(req);
// webClient 调用上传这个是主文件求解算出的文件及stdout文件都指定这个文件夹下面
String masterFilePath = hpcCommandExcuteUtil.uploaHpcFile(masterFile, subDir);
// 0.处理hpc文件
String masterFilePath = handleHpcFileUpload(req, subDir);
// 重新赋值用于command拼接
req.setMasterFilePath(masterFilePath);
// 上传从文件
dealInputFiles(req, subDir);
// 任务输出的文件夹
String hpcOutPutDir = extractDirectory(masterFilePath);
Pair<String, String> pair = extractDirectoryAndFileName(masterFilePath);
String hpcOutPutDir = pair.getLeft();
req.setWorkDir(hpcOutPutDir);
// 包含格式的文件名
req.setMasterFileName(pair.getRight());
int lastDotIndex = pair.getRight().lastIndexOf(".");
String substring = pair.getRight().substring(0, lastDotIndex);
// 不包含格式的文件名
req.setMasterFileNameNoFormat(substring);
// comsol 结果文件定制
req.setComsolResultName(substring+"_result.mph");
// 2.处理命令拼接和参数替换
CommandResult commandResult = buildAndReplaceHpcCommand(req, masterFilePath);
if (StringUtils.isBlank(commandResult.getCommand())) {
@@ -141,6 +174,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
// 拼接逻辑
String subDirPrefix = Objects.isNull(userId) ? "" : (String.valueOf(userId) + "\\");
String subDir = subDirPrefix + req.getJobName() + "\\" + System.currentTimeMillis();
log.info("Hpc任务执行开始结果输出目录:{}", subDir);
return subDir;
}
@@ -153,12 +187,12 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
private CommandResult buildAndReplaceHpcCommand(SubmitHpcTaskReq req, String masterFilePath) {
String command = "";
String formatCommand = "";
// 优先使用传递的command,然后替换求解文件即可
if (StringUtils.isNotBlank(req.getCommand())) {
command = req.getCommand();
// 处理 拼接命令 \\CARSAFE\share\solver\RLithium\reta.exe -i %s
formatCommand = String.format(command, masterFilePath);
// 关键修改:给文件路径包裹双引号后再格式化
formatCommand = String.format(command, "\"" + masterFilePath + "\"");
} else {
// 命令
SimulationHpcCommand simulationHpcCommand = simulationSoftConfigService.lambdaQuery()
@@ -190,6 +224,11 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
// 入参
else if (Objects.equals(placeholder.getFeatchType(), "param")) {
replaceValue = CommandReplaceUtil.getFieldValue(req, keyEnName);
// 当是文件的时候,且不为空的时候,双引号包裹,避免空格,求解文件comsol结果文件
if(placeholder.getValueType().equals("file")&&!ObjectUtil.isNull(replaceValue)){
replaceValue= "\"" + replaceValue + "\"";
log.warn("Hpc命令处理文件名处理后值{}",replaceValue);
}
if (ObjectUtil.isNull(replaceValue)) { // 修复原代码的判断逻辑错误
log.warn("Hpc命令动态参数替换入参反射是空参数名{}", keyEnName);
replaceValue = placeholder.getDefaultValue();
@@ -423,29 +462,45 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
List<MultipartFile> inputFiles = req.getInputFiles();
List<String> list = new ArrayList<>();
for (MultipartFile inputFile : inputFiles) {
String inputFilePath = hpcCommandExcuteUtil.uploaHpcFile(inputFile,subDir);
String inputFilePath = hpcCommandExcuteUtil.uploaHpcFile(inputFile,subDir,pbsWebClient);
list.add(inputFilePath);
}
req.setInputFilePaths(list);
}
/**
* 从文件路径中提取目录部分(包含最后一个路径分隔符)
* @param fullPath 完整的文件路径
* @return 目录路径(包含最后一个反斜杠),若路径为空或无分隔符则返回原路径
* 提取Windows文件路径的父级目录和文件名
* @param fullPath 完整的Windows文件路径
* @return Pair(父级目录, 文件名),若路径无效或只有目录没有文件,文件名为空字符串
*/
private String extractDirectory(String fullPath) {
private Pair<String, String> extractDirectoryAndFileName(String fullPath) {
// 校验参数
if (fullPath == null || fullPath.isEmpty()) {
return fullPath;
return Pair.of(fullPath, "");
}
// 统一处理路径分隔符(考虑用户可能输入/的情况)
String normalizedPath = fullPath.replace("/", "\\");
// 找到最后一个反斜杠的位置
int lastSeparatorIndex = fullPath.lastIndexOf("\\");
// 若没有找到分隔符,返回原路径;否则截取到最后一个分隔符(包含)
int lastSeparatorIndex = normalizedPath.lastIndexOf("\\");
String directory;
String fileName;
// 没有找到分隔符,说明是单纯的文件名(无路径)
if (lastSeparatorIndex == -1) {
return fullPath;
directory = ""; // 父级目录为空
fileName = normalizedPath;
}
return fullPath.substring(0, lastSeparatorIndex + 1);
// 路径以反斜杠结尾,说明是目录路径,没有文件名
else if (lastSeparatorIndex == normalizedPath.length() - 1) {
directory = normalizedPath;
fileName = "";
}
// 正常的文件路径(包含目录和文件名)
else {
directory = normalizedPath.substring(0, lastSeparatorIndex + 1);
fileName = normalizedPath.substring(lastSeparatorIndex + 1);
}
// 返回Pair对象first=目录second=文件名
return Pair.of(directory, fileName);
}
@Override
@@ -588,5 +643,112 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
messageFeignClient.sendMessage(req);
}
/**
* 公共方法处理HPC文件上传主文件+从文件返回主文件在HPC的路径
* @param req 提交HPC任务的请求对象
* @param subDir HPC的工作子目录
* @return 主求解文件在HPC上的路径
* @throws RuntimeException 当主文件为空时抛出异常
*/
private String handleHpcFileUpload(SubmitHpcTaskReq req, String subDir) {
// 主求解文件位置
String masterFilePath = "";
// 求解文件是直接调用传递MultipartFile方式
if (!Objects.isNull(req.getMasterFile())) {
// 上传hpc主文件 及 其他文件
MultipartFile masterFile = req.getMasterFile();
// webClient 调用上传,主文件/求解文件/stdout文件都放在该文件夹下
masterFilePath = hpcCommandExcuteUtil.uploaHpcFile(masterFile, subDir, pbsWebClient);
// 上传从文件
dealInputFiles(req, subDir);
} else {
// 从自动化流程获取(本地文件路径,分片上传)
String localHpcMasterFile = req.getMasterFilePath();
List<String> localInputFilePaths = req.getInputFilePaths();
log.info("Hpc上传求解文件本地主文件位置{},从文件位置:{}", localHpcMasterFile, JSONObject.toJSONString(localInputFilePaths));
// 校验主文件不能为空
if (StringUtils.isBlank(localHpcMasterFile)) {
throw new RuntimeException("Hpc求解主文件不能为空");
}
// 分片上传主文件,获取主文件在头节点的路径
masterFilePath = uploadHpcFile(localHpcMasterFile, subDir);
// 分片上传从文件,更新请求对象中的从文件路径
List<String> totaHpcInputFilePath = new ArrayList<>();
for (String localInputFilePath : localInputFilePaths) {
String hpcInputFilePath = uploadHpcFile(localInputFilePath, subDir);
totaHpcInputFilePath.add(hpcInputFilePath);
}
req.setInputFilePaths(totaHpcInputFilePath);
}
return masterFilePath;
}
private String uploadHpcFile(String filePath,String subDir){
Path path = Paths.get(filePath);
try {
String hpcFilePath = uploadFileInChunks(path, subDir);
return hpcFilePath;
} catch (Exception e) {
log.error("pbs上传求解文件异常{}",e.getMessage());
throw new RuntimeException("Hpc上传求解文件异常");
}
}
public String uploadFileInChunks(Path filePath, String subDir) throws IOException {
final String fileId = UUID.randomUUID().toString(); // 每次上传唯一文件 ID
final long chunkSize = 50 * 1024 * 1024; // 每片 50MB
final long fileSize = Files.size(filePath);
final long totalChunks = (fileSize + chunkSize - 1) / chunkSize;
// 随机访问文件
try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ)) {
for (int chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
long start = chunkIndex * chunkSize;
long size = Math.min(chunkSize, fileSize - start);
ByteBuffer buffer = ByteBuffer.allocate((int) size);
channel.read(buffer, start);
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println("上传分片 " + chunkIndex + " 大小:" + bytes.length);
int finalChunkIndex = chunkIndex;
// 阻塞式上传
pbsWebClient.post()
.uri(uriBuilder -> uriBuilder
.scheme(fileToHpcScheme)
.host(fileToHpcHost)
.port(fileToHpcPort)
.path(fileToHpcPath)
.queryParam("fileId", fileId)
.queryParam("chunkIndex", finalChunkIndex)
.queryParam("subDir", subDir)
.build())
.bodyValue(bytes)
.retrieve()
.bodyToMono(Void.class)
.block(); // 阻塞等待上传完成
}
// 所有分片上传完成 → 调用合并接口
System.out.println("所有分片上传完成,开始合并文件...");
String hpcFilePath = pbsWebClient.post()
.uri(uriBuilder -> uriBuilder
.scheme(fileToHpcScheme)
.host(fileToHpcHost)
.port(fileToHpcPort)
.path(fileToHpcMergePath)
.queryParam("fileId", fileId)
.queryParam("totalChunks", totalChunks)
.queryParam("subDir", subDir)
.queryParam("filename", filePath.getFileName().toString())
.build())
.retrieve()
.bodyToMono(String.class)
.block();// 阻塞等待合并完成
return hpcFilePath;
}
}
}

View File

@@ -115,13 +115,20 @@ hpc:
# remoteCmdUrl: http://127.0.0.1:9097/doProcess
# remote hpc借助工具http远程调用local:该服务和hpc部署在同一机器
excuteWay: remote
remoteCmdUrl: http://192.168.65.55:9097/doProcess
remoteCreateDirUrl: http://192.168.65.55:9097/createDir
remoteScanDirUrl: http://192.168.65.55:9097/scanDir
remoteDownLoadFileUrl: http://192.168.65.55:9097/hpcDownload
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
remoteUploadFileUrl: http://192.168.65.55:9097/uploadHpcFile
callHpcUpload: http://192.168.65.55:9097/addJobQueue
remoteCmdUrl: http://192.168.190.164:9097/doProcess
remoteCreateDirUrl: http://192.168.190.164:9098/createDir
remoteScanDirUrl: http://192.168.190.164:9098/scanDir
remoteDownLoadFileUrl: http://192.168.190.164:9098/hpcDownload
remoteUploadFileUrl: http://192.168.190.164:9098/uploadHpcFile
callHpcUpload: http://192.168.190.164:9098/addJobQueue
# 上传头节点文件相关的配置
fileToHpc:
http:
scheme: http # 协议http/https
host: 192.168.190.164 # 目标主机
port: 9098 # 目标端口
path: /upFileToHpcBatch # 接口路径
mergePath: /upFileToHpcMerge # 合并的接口路径
#logging:
@@ -138,4 +145,9 @@ security:
paths:
- /pbs/jobFileCallback
- /pbs/netTest
- /pbs/adapterSubmitHpcJob
- /pbs/adapterSubmitHpcJob
- /pbs/testEn
testEnStr: ENC(095i92PAFyJQ5kEnkiaCYReMEtw+Dwc8qnS1i7Vx0Y8=)
testEnStr1: ENC(AtQcdulLNvaSvboZuWsXIxuCwrHyUoG3oEGtmKfDSbs=)
testEnStr2: ENC(+QKYnI6gAYu1SbLaZQTkZA==)

View File

@@ -122,6 +122,14 @@ hpc:
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
remoteUploadFileUrl: http://192.168.65.55:9097/uploadHpcFile
callHpcUpload: http://192.168.65.55:9097/addJobQueue
# 上传头节点文件相关的配置
fileToHpc:
http:
scheme: http # 协议http/https
host: 192.168.190.164 # 目标主机
port: 9098 # 目标端口
path: /upFileToHpcBatch # 接口路径
mergePath: /upFileToHpcMerge # 合并的接口路径
#logging:

View File

@@ -122,6 +122,14 @@ hpc:
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
remoteUploadFileUrl: http://10.122.38.200:9098/uploadHpcFile
callHpcUpload: http://10.122.38.200:9098/addJobQueue
# 上传头节点文件相关的配置
fileToHpc:
http:
scheme: http # 协议http/https
host: 10.122.38.200 # 目标主机
port: 9098 # 目标端口
path: /upFileToHpcBatch # 接口路径
mergePath: /upFileToHpcMerge # 合并的接口路径
#logging: