diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileReq.java new file mode 100644 index 00000000..a5927616 --- /dev/null +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/HpcTaskFileReq.java @@ -0,0 +1,15 @@ +package com.sdm.common.entity.req.pbs; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +public class HpcTaskFileReq { + + @Schema(description = "任务ID,和targetDir互斥") + public String jobId; + + @Schema(description = "指定目录,和jobId互斥") + public String targetDir; + +} diff --git a/common/src/main/java/com/sdm/common/entity/resp/pbs/hpc/FileNodeInfo.java b/common/src/main/java/com/sdm/common/entity/resp/pbs/hpc/FileNodeInfo.java new file mode 100644 index 00000000..ae0dce86 --- /dev/null +++ b/common/src/main/java/com/sdm/common/entity/resp/pbs/hpc/FileNodeInfo.java @@ -0,0 +1,22 @@ +package com.sdm.common.entity.resp.pbs.hpc; + +import lombok.Data; + +@Data +public class FileNodeInfo { + + /** 名称 */ + private String name; + + /** 完整路径 */ + private String fullPath; + + /** 是否文件夹 1是文件夹 0 文件 */ + private String directory; + + /** 文件大小(字节,文件夹为 0) */ + private long size; + + /** 最后修改时间 */ + private String lastModifiedTime; +} diff --git a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java index 8ca5787f..acb71ecb 100644 --- a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java +++ b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java @@ -2,29 +2,54 @@ package com.sdm.common.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; +import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.common.log.CoreLogger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; +import java.net.URLEncoder; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; @Slf4j @Component public class HpcCommandExcuteUtil { + @Value("${hpc.excuteWay:}") + private String hpcExcuteWay; + @Value("${hpc.remoteCmdUrl:}") private String remoteCmdUrl; @Value("${hpc.remoteCreateDirUrl:}") private String remoteCreateDirUrl; + @Value("${hpc.remoteScanDirUrl:}") + private String remoteScanDirUrl; + + @Value("${hpc.remoteDownLoadFileUrl:}") + private String remoteDownLoadFileUrl; + @Autowired private HttpClientUtil httpClientUtil; - public String excuteCmd(String command,String hpcExcuteWay) { + @Autowired + private WebClient webClient; + + public String excuteCmd(String command) { if(Objects.equals(hpcExcuteWay,"remote")){ return remoteExcuteCmd(command); } @@ -83,4 +108,67 @@ public class HpcCommandExcuteUtil { } } + public List scanDir(String targetDir) { + List nodeInfos = new ArrayList<>(); + if(Objects.equals(hpcExcuteWay,"remote")){ + com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject(); + paramJson.put("folderPath", targetDir); + String resultString = ""; + try { + resultString = httpClientUtil.doPostJson(remoteScanDirUrl, paramJson.toJSONString()); + CoreLogger.info("scanDir back:{}", resultString); + nodeInfos = JSON.parseObject( + resultString, + new TypeReference>() { + } + ); + return nodeInfos; + } catch (Exception e) { + CoreLogger.error("scanDir error,targetDir:{},errMsg:{}", targetDir, e.getMessage()); + return nodeInfos; + } + } + // 非remote,本地的暂时用不到 + return nodeInfos; + } + + public ResponseEntity hpcDownloadFile(String path, Long fileSize) { + // 从 path 中提取文件名 + String fileName = extractFileName(path); + String encodedFileName = URLEncoder.encode(fileName, StandardCharsets.UTF_8); + StreamingResponseBody body = outputStream -> { + DataBufferUtils.write( + webClient.get() + .uri(remoteDownLoadFileUrl, path) + .retrieve() + .bodyToFlux(DataBuffer.class), + Channels.newChannel(outputStream) + ).blockLast(); // 阻塞直到写完 + }; + + // 构建 ResponseEntity + ResponseEntity.BodyBuilder builder = ResponseEntity.ok() + .header(HttpHeaders.CONTENT_DISPOSITION, + "attachment; filename*=UTF-8''" + encodedFileName) + .contentType(MediaType.APPLICATION_OCTET_STREAM); + + // 只有在 fileSize 合法时才设置 Content-Length + if (fileSize != null && fileSize > 0) { + builder.contentLength(fileSize); + } + return builder.body(body); + } + + private String extractFileName(String path) { + if (path == null || path.isBlank()) { + return "unknown"; + } + // 同时兼容 / 和 \ + path = path.replace("\\", "/"); + + int lastSlash = path.lastIndexOf('/'); + return lastSlash >= 0 ? path.substring(lastSlash + 1) : path; + } + + } diff --git a/pbs/src/main/java/com/sdm/pbs/config/webclient/WebClientConfig.java b/pbs/src/main/java/com/sdm/pbs/config/webclient/WebClientConfig.java new file mode 100644 index 00000000..ab05fce2 --- /dev/null +++ b/pbs/src/main/java/com/sdm/pbs/config/webclient/WebClientConfig.java @@ -0,0 +1,89 @@ +package com.sdm.pbs.config.webclient; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +@Configuration +public class WebClientConfig { + + @Value("${webClient.maxConnections:50}") + private int maxConnections; + + @Value("${webClient.pendingAcquireMaxCount:200}") + private int pendingAcquireMaxCount; + + @Value("${webClient.pendingAcquireTimeout:30}") + private int pendingAcquireTimeout; + + @Value("${webClient.maxIdleTime:60}") + private int maxIdleTime; + + @Value("${webClient.maxLifeTime:5}") + private int maxLifeTime; + + @Value("${webClient.responseTimeoutMinutes:30}") + private int responseTimeoutMinutes; + + @Value("${webClient.readTimeoutMinutes:30}") + private int readTimeoutMinutes; + + @Value("${webClient.writeTimeoutMinutes:30}") + private int writeTimeoutMinutes; + + @Value("${webClient.maxInMemorySize:16384}") + private int maxInMemorySize; + + @Value("${webClient.connectTimeout:10000}") + private int connectTimeout; + + + + @Bean + public WebClient webClient() { + + // 1. 连接池配置 + ConnectionProvider connectionProvider = ConnectionProvider.builder("webclient-pool") + .maxConnections(maxConnections) // 最大连接 + .pendingAcquireMaxCount(pendingAcquireMaxCount) // 等待队列 + .pendingAcquireTimeout(Duration.ofSeconds(pendingAcquireTimeout)) + .maxIdleTime(Duration.ofSeconds(maxIdleTime)) // 空闲连接 + .maxLifeTime(Duration.ofMinutes(maxLifeTime)) // 连接最大生命周期 + .build(); + + // 2. HttpClient 配置 + HttpClient httpClient = HttpClient.create(connectionProvider) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) + .responseTimeout(Duration.ofMinutes(responseTimeoutMinutes)) // 可能是大文件下载 + .doOnConnected(conn -> + conn.addHandlerLast(new ReadTimeoutHandler(readTimeoutMinutes, TimeUnit.MINUTES)) + .addHandlerLast(new WriteTimeoutHandler(writeTimeoutMinutes, TimeUnit.MINUTES)) + ) + .compress(true); // gzip + + // 3. WebClient 构建 + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(httpClient)) + // 禁用大内存聚合(否则大文件会 OOM) + .exchangeStrategies(ExchangeStrategies.builder() + .codecs(configurer -> + configurer.defaultCodecs() + .maxInMemorySize(maxInMemorySize) // 16KB + ) + .build() + ) + .build(); + } + +} diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java index 2963f04c..66cead53 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java @@ -1,6 +1,7 @@ package com.sdm.pbs.controller; import com.sdm.common.common.SdmResponse; +import com.sdm.common.entity.req.pbs.HpcTaskFileReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.entity.req.pbs.hpc.*; import com.sdm.common.entity.resp.pbs.hpc.*; @@ -26,6 +27,7 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; +import java.util.List; import java.util.Map; @RestController @@ -51,7 +53,7 @@ public class TaskController implements ITaskFeignClient { if(StringUtils.isBlank(command)){ return "无command"; } - return hpcCommandExcuteUtil.excuteCmd(command,"remote"); + return hpcCommandExcuteUtil.excuteCmd(command); } @GetMapping ("/queryHpcResource") @@ -80,6 +82,19 @@ public class TaskController implements ITaskFeignClient { return pbsService.getJobStatus(jobId); } + @PostMapping("/getJobResultFiles") + @Operation(summary = "作业下文件查询") + SdmResponse> getJobResultFiles(@RequestBody HpcTaskFileReq req) { + return pbsService.getJobResultFiles(req.getJobId(),req.getTargetDir()); + } + + @GetMapping("/hpcDownloadFile") + @Operation(summary = "作业下文件下载") + ResponseEntity hpcDownloadFile(@RequestParam String jobId,@RequestParam String fileName,@RequestParam Long fileSize) { + return pbsService.downloadFile(jobId,fileName,fileSize); + } + + /*====================================================================*/ @PostMapping("/nodeList") diff --git a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java index dc67eeed..c1f14f77 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java @@ -11,6 +11,8 @@ import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; +import java.util.List; + @Service public interface HpcInstructionService { @@ -48,5 +50,9 @@ public interface HpcInstructionService { ResponseEntity hpcDownloadFiles(HpcDownloadFilesReq downloadFilesReq); + SdmResponse> scanDir(String targetDir); + + ResponseEntity hpcDownloadFile(String fileName,Long fileSize); + } diff --git a/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java b/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java index 7998e5b9..a81e4d3d 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/IPbsService.java @@ -1,10 +1,13 @@ package com.sdm.pbs.service; import com.sdm.common.common.SdmResponse; -import com.sdm.pbs.model.bo.FileBaseInfo; +import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; import com.sdm.pbs.model.req.SubmitHpcTaskReq; +import jakarta.servlet.http.HttpServletResponse; +import org.springframework.http.ResponseEntity; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import java.util.List; @@ -43,7 +46,7 @@ public interface IPbsService { * @param jobId * @return */ - SdmResponse> getJobResultFiles(String jobId); + SdmResponse> getJobResultFiles(String jobId,String targetDir); /** * 下载计算结果文件 @@ -51,5 +54,6 @@ public interface IPbsService { * @param fileName * @return */ - SdmResponse downloadFile(String jobId, String fileName); + ResponseEntity downloadFile(String jobId, String fileName,Long fileSize); + } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java index 955a244a..677f5bb4 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java @@ -48,9 +48,6 @@ import java.util.zip.ZipOutputStream; @Service public class HpcInstructionServiceImpl implements HpcInstructionService { - @Value("${hpc.excuteWay:}") - private String hpcExcuteWay; - @Value("${hpc.file.basePath:F:\\hpc}") private String basePath; @@ -67,7 +64,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { NodeListParam nodeListParam = new NodeListParam(); BeanUtils.copyProperties(req, nodeListParam); String nodeListCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, nodeListParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(nodeListCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(nodeListCommand); List nodeList = HpcCommandResulParseUtil.parseNodList(result); nodeListResp.setNodes(nodeList); nodeListResp.setHpcCommand(nodeListCommand); @@ -80,7 +77,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { NodeListCoreParam nodeListCoreParam = new NodeListCoreParam(); BeanUtils.copyProperties(req, nodeListCoreParam); String nodeListCoreCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, nodeListCoreParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(nodeListCoreCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(nodeListCoreCommand); List nodeListCore = HpcCommandResulParseUtil.parseNodeCoreList(result); NodeListCoreResp nodeListCoreResp = new NodeListCoreResp(); nodeListCoreResp.setNodeCores(nodeListCore); @@ -94,7 +91,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { NodeViewParam nodeViewParam = new NodeViewParam(); BeanUtils.copyProperties(req, nodeViewParam); String nodeViewCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, nodeViewParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(nodeViewCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(nodeViewCommand); NodeViewResp nodeViewResp = HpcCommandResulParseUtil.parseNodeView(result); nodeViewResp.setHpcCommand(nodeViewCommand); return SdmResponse.success(nodeViewResp); @@ -106,7 +103,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { NewJobParam newJobParam = new NewJobParam(); BeanUtils.copyProperties(req, newJobParam); String newJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, newJobParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(newJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(newJobCommand); NewJobResp newJobResp = HpcCommandResulParseUtil.parseJobNewResult(result); newJobResp.setHpcCommand(newJobCommand); if(Objects.isNull(newJobResp)|| StringUtils.isBlank(newJobResp.getJobId())){ @@ -130,7 +127,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { } addJobParam.setWorkdir(targetWorkDir); String addJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, addJobParam, req.getCommand()); - String result = hpcCommandExcuteUtil.excuteCmd(addJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(addJobCommand); AddJobResp addJobResp = HpcCommandResulParseUtil.parseJoAddResult(result); addJobResp.setHpcCommand(addJobCommand); if(Objects.isNull(addJobResp)||StringUtils.isBlank(addJobResp.getTsakId())){ @@ -146,7 +143,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { SubmitHpcJobParam submitHpcJobParam = new SubmitHpcJobParam(); BeanUtils.copyProperties(req, submitHpcJobParam); String submitJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, submitHpcJobParam, req.getCommand()); - String result = hpcCommandExcuteUtil.excuteCmd(submitJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(submitJobCommand); SubmitHpcJobResp submitHpcJobResp = HpcCommandResulParseUtil.parseJobSubmitResult(result); submitHpcJobResp.setJobId(req.getId()); submitHpcJobResp.setHpcCommand(submitJobCommand); @@ -220,7 +217,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { CancelJobParam cancelJobParam = new CancelJobParam(); BeanUtils.copyProperties(req, cancelJobParam); String cancelJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, cancelJobParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(cancelJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(cancelJobCommand); JobCancelResp jobCancelResp = HpcCommandResulParseUtil.parseJobCancel(result); jobCancelResp.setHpcCommand(cancelJobCommand); if(Objects.isNull(jobCancelResp)||Objects.equals(false,jobCancelResp.getCanceled())){ @@ -236,7 +233,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { CloneJobParam cloneJobParam = new CloneJobParam(); BeanUtils.copyProperties(req, cloneJobParam); String cloneJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, cloneJobParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(cloneJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(cloneJobCommand); CloneJobResp cloneJobResp = HpcCommandResulParseUtil.parseJobCloneResult(result); cloneJobResp.setHpcCommand(cloneJobCommand); if(Objects.isNull(cloneJobResp)||StringUtils.isBlank(cloneJobResp.getJobId())){ @@ -252,7 +249,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { FinishJobParam finishJobParam = new FinishJobParam(); BeanUtils.copyProperties(req, finishJobParam); String finishJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, finishJobParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(finishJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(finishJobCommand); JobFinishResp jobFinishResp = HpcCommandResulParseUtil.parseJobFinish(result); jobFinishResp.setHpcCommand(finishJobCommand); if(Objects.isNull(jobFinishResp)||Objects.equals(false,jobFinishResp.getFinished())){ @@ -268,7 +265,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { ListJobParam listJobParam = new ListJobParam(); BeanUtils.copyProperties(req, listJobParam); String listJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, listJobParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(listJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(listJobCommand); List jobLists = HpcCommandResulParseUtil.parseJobLists(result); ListJobResp listJobResp = new ListJobResp(); listJobResp.setListJobs(jobLists); @@ -282,7 +279,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { ListTasksParam listTasksParam = new ListTasksParam(); BeanUtils.copyProperties(req, listTasksParam); String listTasksJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, listTasksParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(listTasksJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(listTasksJobCommand); List jobTaskResp = HpcCommandResulParseUtil.parseJobTasks(result); ListTasksResp listTasksResp = new ListTasksResp(); listTasksResp.setListTasks(jobTaskResp); @@ -296,7 +293,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { JobModifyParam jobModifyParam = new JobModifyParam(); BeanUtils.copyProperties(req, jobModifyParam); String modifyJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, jobModifyParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(modifyJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(modifyJobCommand); JobModifyResp jobModifyResp = HpcCommandResulParseUtil.parseJobModify(result); jobModifyResp.setHpcCommand(modifyJobCommand); if(Objects.isNull(jobModifyResp)||Objects.equals(false,jobModifyResp.getModified())){ @@ -312,7 +309,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { JobRequeueParam jobRequeueParam = new JobRequeueParam(); BeanUtils.copyProperties(req, jobRequeueParam); String requeueJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, jobRequeueParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(requeueJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(requeueJobCommand); JobRequeueResp jobRequeueResp = HpcCommandResulParseUtil.parseJobRequeue(result); jobRequeueResp.setHpcCommand(requeueJobCommand); if(Objects.isNull(jobRequeueResp)||Objects.equals(false,jobRequeueResp.getRequeued())){ @@ -328,7 +325,7 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { JobViewParam jobViewParam = new JobViewParam(); BeanUtils.copyProperties(req, jobViewParam); String viewJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, jobViewParam, ""); - String result = hpcCommandExcuteUtil.excuteCmd(viewJobCommand,hpcExcuteWay); + String result = hpcCommandExcuteUtil.excuteCmd(viewJobCommand); JobViewResp jobViewResp = HpcCommandResulParseUtil.parseJobView(result); jobViewResp.setHpcCommand(viewJobCommand); return SdmResponse.success(jobViewResp); @@ -471,6 +468,17 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { return fileResponse; } + @Override + public SdmResponse> scanDir(String targetDir) { + List fileNodeInfos = hpcCommandExcuteUtil.scanDir(targetDir); + return SdmResponse.success(fileNodeInfos); + } + + @Override + public ResponseEntity hpcDownloadFile(String fileName,Long fileSize) { + return hpcCommandExcuteUtil.hpcDownloadFile(fileName,fileSize); + } + /** * 使用 MappedByteBuffer + 分块映射实现超大文件下载(高性能、低内存) */ diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java index ff0addcb..00debf06 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/IPbsHpcServiceImpl.java @@ -6,28 +6,28 @@ import com.sdm.common.entity.constants.NumberConstants; import com.sdm.common.entity.req.pbs.hpc.*; import com.sdm.common.entity.resp.pbs.hpc.JobCancelResp; import com.sdm.common.entity.resp.pbs.hpc.SubmitHpcJobResp; -import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobResp; -import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobs; import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasks; import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasksResp; import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeList; import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeListResp; import com.sdm.common.log.CoreLogger; -import com.sdm.pbs.model.bo.FileBaseInfo; +import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcNodeInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import com.sdm.pbs.service.HpcInstructionService; import com.sdm.pbs.service.IPbsService; -import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import java.util.ArrayList; import java.util.Collections; @@ -127,13 +127,13 @@ public class IPbsHpcServiceImpl implements IPbsService { @Override - public SdmResponse> getJobResultFiles(String jobId) { - return null; + public SdmResponse> getJobResultFiles(String jobId,String targetDir) { + return hpcInstructionService.scanDir(targetDir); } @Override - public SdmResponse downloadFile(String jobId, String fileName) { - return null; + public ResponseEntity downloadFile(String jobId, String fileName,Long fileSize) { + return hpcInstructionService.hpcDownloadFile(fileName,fileSize); } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index 94df2a24..09f42702 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -1,16 +1,19 @@ package com.sdm.pbs.service.impl; import com.sdm.common.common.SdmResponse; -import com.sdm.pbs.model.bo.FileBaseInfo; +import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.pbs.model.bo.HpcJobStatusInfo; import com.sdm.pbs.model.bo.HpcResouceInfo; import com.sdm.pbs.model.req.SubmitHpcTaskReq; import com.sdm.pbs.service.IPbsService; import com.sdm.pbs.service.IPbsServiceDecorator; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import java.util.List; @@ -55,12 +58,21 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { } @Override - public SdmResponse> getJobResultFiles(String jobId) { - return null; + public SdmResponse> getJobResultFiles(String jobId,String targetDir) { + // todo 根据jobId 获取工作目录,共享目录+jobName(文件回传)+uuid,下面可能有多个文件 + String workDir = StringUtils.isNotBlank(targetDir) ? targetDir :"D:\\需求"; + SdmResponse> nodeInfos = pbsService.getJobResultFiles("", workDir); + return nodeInfos; } @Override - public SdmResponse downloadFile(String jobId, String fileName) { - return null; + public ResponseEntity downloadFile(String jobId, String fileName,Long fileSize) { + // todo 预留根据jobId 获取工作目录,做权限控制,越权 + if(StringUtils.isBlank(fileName)){ + throw new RuntimeException("下载文件路径为空"); + } + return pbsService.downloadFile("", fileName,fileSize); } + + } diff --git a/pom.xml b/pom.xml index 05cb792b..177ad680 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,12 @@ org.springframework.boot spring-boot-starter + + + org.springframework.boot + spring-boot-starter-webflux + +