修改:hpc提交任务接口新增一个一次提交任务接口;pbs端口号修改7105;Hpc接口响应对象自定义对象;通用失败返回新增重载带响应体data方法。
This commit is contained in:
@@ -116,6 +116,15 @@ public class SdmResponse<T> implements Serializable {
|
||||
return new SdmResponse<T>(ResultCode.FAILED.getCode(), message, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 失败返回结果,同时返回message
|
||||
*
|
||||
* @param message 提示信息
|
||||
*/
|
||||
public static <T> SdmResponse<T> failed(String message,T data) {
|
||||
return new SdmResponse<T>(ResultCode.FAILED.getCode(), message, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 失败返回结果
|
||||
*
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.sdm.common.entity.req.pbs.hpc;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
@Data
|
||||
public class HpcChunkUploadFileReq {
|
||||
|
||||
private String taskName;
|
||||
|
||||
// 原始文件的名称
|
||||
private String sourceFileName;
|
||||
|
||||
// 当前为第几分片
|
||||
private Integer chunk;
|
||||
|
||||
// DigestUtils.md5Hex(chunkData[])
|
||||
private String chunkMd5;
|
||||
|
||||
// // 每个分块的大小--就是正常固定分片大小
|
||||
// private Long size;
|
||||
|
||||
// 分片总数
|
||||
private Integer chunkTotal;
|
||||
|
||||
// 分块文件传输对象
|
||||
private MultipartFile file;
|
||||
|
||||
// // 是否为第一次请求,1 是,0 不是
|
||||
// private Integer isFirstReq;
|
||||
|
||||
private Integer type;
|
||||
|
||||
// 第一片请求不传,后面的请求必传,第一次请求成功后后端会返回,本次文件的父目录
|
||||
private String fileDirPath;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.sdm.common.entity.req.pbs.hpc;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class HpcDownloadFilesReq {
|
||||
|
||||
// @NotBlank(message = "任务名不能为空")
|
||||
private String taskName;
|
||||
|
||||
// @NotNull(message = "任务类型不能为空")
|
||||
private Integer type;
|
||||
|
||||
private List<String> filesPaths;
|
||||
|
||||
// private String dirName;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.sdm.common.entity.req.pbs.hpc;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Schema(description = "合并创建HPC JOB请求参数")
|
||||
public class MergeSubmitHpcJobReq {
|
||||
|
||||
@Schema(description = "创建HPC JOB请求参数,对应的new指令")
|
||||
private NewJobReq newJobReq;
|
||||
|
||||
@Schema(description = "HPC作业添加任务请求参数(对应 job add 命令)")
|
||||
private AddJobReq addJobReq;
|
||||
|
||||
@Schema(description = "HPC作业提交请求参数(对应 job submit 命令)")
|
||||
private SubmitHpcJobReq submitHpcJobReq;
|
||||
|
||||
}
|
||||
@@ -3,6 +3,6 @@ package com.sdm.common.entity.resp.pbs.hpc;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class AddJobResp {
|
||||
public class AddJobResp extends HpcBaseResp {
|
||||
private String tsakId;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.sdm.common.entity.resp.pbs.hpc;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Schema(description = "创建HPC指令响应基类")
|
||||
public class HpcBaseResp {
|
||||
|
||||
@Schema(description = "当前请求对应的完整指令", example = "job submit /id:3007")
|
||||
private String hpcCommand;
|
||||
|
||||
@Schema(description = "当前请求指令异常信息", example = "'11' 不是内部或外部命令,也不是可运行的程序或批处理文件。")
|
||||
private String errMsg;
|
||||
|
||||
}
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
package com.sdm.common.entity.resp.pbs.hpc;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class NewJobResp {
|
||||
@Schema(description = "job new 返回")
|
||||
public class NewJobResp extends HpcBaseResp {
|
||||
|
||||
@Schema(description = "job new 返回的任务id", example = "3001")
|
||||
private String jobId;
|
||||
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.sdm.common.entity.resp.pbs.hpc;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class SubmitHpcJobResp {
|
||||
public class SubmitHpcJobResp extends HpcBaseResp {
|
||||
// true 成功 false 失败
|
||||
private Boolean submit;
|
||||
|
||||
|
||||
@@ -2,14 +2,20 @@ package com.sdm.pbs.controller;
|
||||
|
||||
import com.sdm.common.common.SdmResponse;
|
||||
import com.sdm.common.entity.req.pbs.hpc.*;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.AddJobResp;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.NewJobResp;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.SubmitHpcJobResp;
|
||||
import com.sdm.pbs.service.TaskService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/pbs")
|
||||
@@ -39,22 +45,28 @@ public class TaskController {
|
||||
|
||||
@PostMapping("/jobNew")
|
||||
@Operation(summary = "创建job任务")
|
||||
public SdmResponse jobNew(@RequestBody NewJobReq req) {
|
||||
public SdmResponse<NewJobResp> jobNew(@RequestBody NewJobReq req) {
|
||||
return taskService.jobNew(req);
|
||||
}
|
||||
|
||||
@PostMapping("/jobAdd")
|
||||
@Operation(summary = "添加job任务")
|
||||
public SdmResponse jobAdd(@RequestBody AddJobReq req) {
|
||||
public SdmResponse<AddJobResp> jobAdd(@RequestBody AddJobReq req) {
|
||||
return taskService.jobAdd(req);
|
||||
}
|
||||
|
||||
@PostMapping("/jobSubmit")
|
||||
@Operation(summary = "提交job任务")
|
||||
public SdmResponse jobSubmit(@RequestBody SubmitHpcJobReq req) {
|
||||
public SdmResponse<SubmitHpcJobResp> jobSubmit(@RequestBody SubmitHpcJobReq req) {
|
||||
return taskService.jobSubmit(req);
|
||||
}
|
||||
|
||||
@PostMapping("/jobMergeSubmit")
|
||||
@Operation(summary = "合并提交Hpc job任务")
|
||||
public SdmResponse<SubmitHpcJobResp> jobMergeSubmit(@RequestBody MergeSubmitHpcJobReq req) {
|
||||
return taskService.jobMergeSubmit(req);
|
||||
}
|
||||
|
||||
@PostMapping("/jobCancel")
|
||||
@Operation(summary = "取消job任务")
|
||||
public SdmResponse jobCancel(@RequestBody CancelJobReq req) {
|
||||
@@ -103,7 +115,28 @@ public class TaskController {
|
||||
return taskService.jobView(req);
|
||||
}
|
||||
|
||||
/**
|
||||
* 分片上传文件,不支持批量
|
||||
*
|
||||
* @param req
|
||||
* @return SdmResponse
|
||||
*/
|
||||
@PostMapping("/hpcChunkUploadFile")
|
||||
@Operation(summary = "文件分片上传")
|
||||
public SdmResponse hpcChunkUploadFile(HpcChunkUploadFileReq req) {
|
||||
return taskService.hpcChunkUploadFile(req);
|
||||
}
|
||||
|
||||
/**
|
||||
* 下载文件
|
||||
*
|
||||
* @param downloadFilesReq downloadFilesReq
|
||||
* @return SdmResponse 流式下载
|
||||
*/
|
||||
@PostMapping("/hpcDownloadFiles")
|
||||
public ResponseEntity<StreamingResponseBody> hpcDownloadFiles(@RequestBody @Validated HpcDownloadFilesReq downloadFilesReq) {
|
||||
return taskService.hpcDownloadFiles(downloadFilesReq);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -2,7 +2,12 @@ package com.sdm.pbs.service;
|
||||
|
||||
import com.sdm.common.common.SdmResponse;
|
||||
import com.sdm.common.entity.req.pbs.hpc.*;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.AddJobResp;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.NewJobResp;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.SubmitHpcJobResp;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
|
||||
@Service
|
||||
public interface TaskService {
|
||||
@@ -13,11 +18,13 @@ public interface TaskService {
|
||||
|
||||
SdmResponse nodeView(NodeViewReq req);
|
||||
|
||||
SdmResponse jobNew(NewJobReq req);
|
||||
SdmResponse<NewJobResp> jobNew(NewJobReq req);
|
||||
|
||||
SdmResponse jobAdd(AddJobReq req);
|
||||
SdmResponse<AddJobResp> jobAdd(AddJobReq req);
|
||||
|
||||
SdmResponse jobSubmit(SubmitHpcJobReq req);
|
||||
SdmResponse<SubmitHpcJobResp> jobSubmit(SubmitHpcJobReq req);
|
||||
|
||||
SdmResponse<SubmitHpcJobResp> jobMergeSubmit(MergeSubmitHpcJobReq req);
|
||||
|
||||
SdmResponse jobCancel(CancelJobReq req);
|
||||
|
||||
@@ -35,4 +42,9 @@ public interface TaskService {
|
||||
|
||||
SdmResponse jobView(JobViewReq req);
|
||||
|
||||
SdmResponse hpcChunkUploadFile(HpcChunkUploadFileReq req);
|
||||
|
||||
ResponseEntity<StreamingResponseBody> hpcDownloadFiles(HpcDownloadFilesReq downloadFilesReq);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package com.sdm.pbs.service.impl;
|
||||
|
||||
import com.sdm.common.common.SdmResponse;
|
||||
import com.sdm.common.entity.constants.NumberConstants;
|
||||
import com.sdm.common.entity.constants.PermConstants;
|
||||
import com.sdm.common.entity.pojo.pbs.hpc.*;
|
||||
import com.sdm.common.entity.req.pbs.hpc.*;
|
||||
import com.sdm.common.entity.resp.pbs.hpc.*;
|
||||
import com.sdm.common.log.CoreLogger;
|
||||
import com.sdm.common.utils.HpcCommandBuilderUtil;
|
||||
import com.sdm.common.utils.HpcCommandExcuteUtil;
|
||||
import com.sdm.common.utils.HpcCommandResulParseUtil;
|
||||
@@ -13,12 +16,24 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.*;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@@ -27,9 +42,15 @@ public class TaskServiceImpl implements TaskService {
|
||||
@Value("${hpc.excuteWay:}")
|
||||
private String hpcExcuteWay;
|
||||
|
||||
@Value("${hpc.file.basePath:F:\\hpc}")
|
||||
private String basePath;
|
||||
|
||||
@Autowired
|
||||
private HpcCommandExcuteUtil hpcCommandExcuteUtil;
|
||||
|
||||
// 8MB 每次映射
|
||||
private static final long MAP_SIZE = 10 * 1024 * 1024;
|
||||
|
||||
@Override
|
||||
public SdmResponse nodeList(NodeListReq req) {
|
||||
String prefixStr = HpcCommandBuilderUtil.initNodeListPrefixStr(req.getActiveheadnode());
|
||||
@@ -73,43 +94,39 @@ public class TaskServiceImpl implements TaskService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public SdmResponse jobNew(NewJobReq req) {
|
||||
public SdmResponse<NewJobResp> jobNew(NewJobReq req) {
|
||||
String prefixStr = HpcCommandBuilderUtil.initNewJobPrefixStr();
|
||||
NewJobParam newJobParam = new NewJobParam();
|
||||
BeanUtils.copyProperties(req, newJobParam);
|
||||
String newJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, newJobParam, "");
|
||||
String result = hpcCommandExcuteUtil.excuteCmd(newJobCommand,hpcExcuteWay);
|
||||
NewJobResp newJobResp = HpcCommandResulParseUtil.parseJobNewResult(result);
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("hpcCommand", newJobCommand);
|
||||
newJobResp.setHpcCommand(newJobCommand);
|
||||
if(Objects.isNull(newJobResp)|| StringUtils.isBlank(newJobResp.getJobId())){
|
||||
map.put("errMsg", result);
|
||||
return SdmResponse.failed(map);
|
||||
newJobResp.setErrMsg(result);
|
||||
return SdmResponse.failed(result,newJobResp);
|
||||
}
|
||||
map.put("result", newJobResp);
|
||||
return SdmResponse.success(map);
|
||||
return SdmResponse.success(newJobResp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SdmResponse jobAdd(AddJobReq req) {
|
||||
public SdmResponse<AddJobResp> jobAdd(AddJobReq req) {
|
||||
String prefixStr = HpcCommandBuilderUtil.initAddJobPrefixStr(req.getJobId());
|
||||
AddJobParam addJobParam = new AddJobParam();
|
||||
BeanUtils.copyProperties(req, addJobParam);
|
||||
String addJobCommand = HpcCommandBuilderUtil.buildHpcCommandStr(prefixStr, addJobParam, req.getCommand());
|
||||
String result = hpcCommandExcuteUtil.excuteCmd(addJobCommand,hpcExcuteWay);
|
||||
AddJobResp addJobResp = HpcCommandResulParseUtil.parseJoAddResult(result);
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("hpcCommand", addJobCommand);
|
||||
addJobResp.setHpcCommand(addJobCommand);
|
||||
if(Objects.isNull(addJobResp)||StringUtils.isBlank(addJobResp.getTsakId())){
|
||||
map.put("errMsg", result);
|
||||
return SdmResponse.failed(map);
|
||||
addJobResp.setErrMsg(result);
|
||||
return SdmResponse.failed(result,addJobResp);
|
||||
}
|
||||
map.put("result", addJobResp);
|
||||
return SdmResponse.success(map);
|
||||
return SdmResponse.success(addJobResp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SdmResponse jobSubmit(SubmitHpcJobReq req) {
|
||||
public SdmResponse<SubmitHpcJobResp> jobSubmit(SubmitHpcJobReq req) {
|
||||
String prefixStr = HpcCommandBuilderUtil.initSubmitJobPrefixStr(req.getId());
|
||||
SubmitHpcJobParam submitHpcJobParam = new SubmitHpcJobParam();
|
||||
BeanUtils.copyProperties(req, submitHpcJobParam);
|
||||
@@ -117,14 +134,56 @@ public class TaskServiceImpl implements TaskService {
|
||||
String result = hpcCommandExcuteUtil.excuteCmd(submitJobCommand,hpcExcuteWay);
|
||||
SubmitHpcJobResp submitHpcJobResp = HpcCommandResulParseUtil.parseJobSubmitResult(result);
|
||||
submitHpcJobResp.setJobId(req.getId());
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("hpcCommand", submitJobCommand);
|
||||
submitHpcJobResp.setHpcCommand(submitJobCommand);
|
||||
if(Objects.isNull(submitHpcJobResp)||Objects.equals(false,submitHpcJobResp.getSubmit())){
|
||||
map.put("errMsg", result);
|
||||
return SdmResponse.failed(map);
|
||||
submitHpcJobResp.setErrMsg(result);
|
||||
return SdmResponse.failed(result,submitHpcJobResp);
|
||||
}
|
||||
map.put("result", submitHpcJobResp);
|
||||
return SdmResponse.success(map);
|
||||
return SdmResponse.success(submitHpcJobResp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SdmResponse jobMergeSubmit(MergeSubmitHpcJobReq req) {
|
||||
SubmitHpcJobResp mergeSubmitResp = new SubmitHpcJobResp();
|
||||
NewJobReq newJobReq = req.getNewJobReq();
|
||||
AddJobReq addJobReq = req.getAddJobReq();
|
||||
SubmitHpcJobReq submitHpcJobReq = req.getSubmitHpcJobReq();
|
||||
// 1.job new
|
||||
SdmResponse<NewJobResp> newResponse = jobNew(newJobReq);
|
||||
String jobId = "";
|
||||
if(newResponse.isSuccess()&&!Objects.isNull(newResponse.getData())&&
|
||||
StringUtils.isNotBlank(newResponse.getData().getJobId())){
|
||||
jobId = newResponse.getData().getJobId();
|
||||
}
|
||||
if(StringUtils.isBlank(jobId)){
|
||||
mergeSubmitResp.setSubmit(false);
|
||||
String errMsg = "job new 指令执行失败";
|
||||
mergeSubmitResp.setErrMsg(errMsg);
|
||||
// 错误的指令返回
|
||||
mergeSubmitResp.setHpcCommand(newResponse.getData().getHpcCommand());
|
||||
return SdmResponse.failed(errMsg,mergeSubmitResp);
|
||||
}
|
||||
// 2.job add
|
||||
addJobReq.setJobId(jobId);
|
||||
SdmResponse<AddJobResp> addResponse = jobAdd(addJobReq);
|
||||
String taskId = "";
|
||||
if(addResponse.isSuccess()&&!Objects.isNull(addResponse.getData())&&
|
||||
StringUtils.isNotBlank(addResponse.getData().getTsakId())){
|
||||
taskId = addResponse.getData().getTsakId();
|
||||
}
|
||||
if(StringUtils.isBlank(taskId)){
|
||||
mergeSubmitResp.setSubmit(false);
|
||||
String errMsg = "job add 指令执行失败";
|
||||
mergeSubmitResp.setErrMsg(errMsg);
|
||||
// 错误的指令返回
|
||||
mergeSubmitResp.setHpcCommand(addResponse.getData().getHpcCommand());
|
||||
return SdmResponse.failed(errMsg,mergeSubmitResp);
|
||||
}
|
||||
// 3.job submit
|
||||
submitHpcJobReq.setId(jobId);
|
||||
// jobSubmit 方法内部已经包装成功或者失败及原因
|
||||
SdmResponse<SubmitHpcJobResp> submitResp = jobSubmit(submitHpcJobReq);
|
||||
return submitResp;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -259,4 +318,219 @@ public class TaskServiceImpl implements TaskService {
|
||||
return SdmResponse.success(map);
|
||||
}
|
||||
|
||||
/**
|
||||
* 分片上传核心方法
|
||||
* @param req 分片上传请求参数
|
||||
* 文件存储根路径(如:/data/upload/)
|
||||
* @return 响应结果 resultMap
|
||||
* fileDirPath:合并文件夹的绝对路径
|
||||
* filePath:合并文件的绝对路径
|
||||
* errMsg:失败原因
|
||||
*/
|
||||
@Override
|
||||
public SdmResponse hpcChunkUploadFile(HpcChunkUploadFileReq req) {
|
||||
Map<String, Object> resultMap = new HashMap<>();
|
||||
// 基础路径配置
|
||||
// Long tenantId = ThreadLocalContext.getTenantId();
|
||||
Long tenantId = 123456l;
|
||||
Long userId = 9999l;
|
||||
// Long userId= ThreadLocalContext.getUserId();
|
||||
// 1. 参数校验
|
||||
try {
|
||||
validateRequest(req,tenantId,userId);
|
||||
} catch (Exception e) {
|
||||
CoreLogger.error("validateRequest error:{}", e.getMessage());
|
||||
resultMap.put("errMsg", e.getMessage());
|
||||
return SdmResponse.failed(resultMap);
|
||||
}
|
||||
// 2.确定文件夹
|
||||
String timestamp = String.valueOf(System.currentTimeMillis());
|
||||
// 合并目录
|
||||
String filePath =StringUtils.isNotBlank(req.getFileDirPath())?
|
||||
req.getFileDirPath():basePath + File.separator + tenantId + File.separator + userId +
|
||||
File.separator + timestamp + File.separator;
|
||||
// 分片目录
|
||||
String tempDirPath = filePath+File.separator+"temp"+File.separator;
|
||||
File tempDir = new File(tempDirPath);
|
||||
if (!tempDir.exists()) tempDir.mkdirs();
|
||||
// 1. 保存当前分片到临时目录 1 2 3 4 ....temp
|
||||
File chunkFile = new File(tempDir, req.getChunk() + PermConstants.CHUNK_TEMPFILE_SUFFIX);
|
||||
try {
|
||||
req.getFile().transferTo(chunkFile);
|
||||
} catch (Exception e) {
|
||||
CoreLogger.error("transferTo tempFile error:{}",e.getMessage());
|
||||
resultMap.put("errMsg", "transferTo tempFile error");
|
||||
resultMap.put("fileDirPath", filePath);
|
||||
resultMap.put("filePath", "");
|
||||
// todo delete temp file
|
||||
return SdmResponse.failed(resultMap);
|
||||
}
|
||||
|
||||
// 2. 判断分片是否已全部上传完毕
|
||||
int uploaded = Objects.requireNonNull(tempDir.list()).length;
|
||||
if (uploaded < req.getChunkTotal()) {
|
||||
resultMap.put("errMsg", "");
|
||||
resultMap.put("fileDirPath", filePath);
|
||||
resultMap.put("filePath", "");
|
||||
return SdmResponse.success(resultMap);
|
||||
}
|
||||
|
||||
// 3. 全部分片已经上传 => 自动合并
|
||||
File targetFile = new File(filePath +File.separator + req.getSourceFileName());
|
||||
try {
|
||||
mergeChunksWithFileChannel(tempDir, targetFile, req.getChunkTotal());
|
||||
} catch (Exception e) {
|
||||
CoreLogger.error("merge tempFile error:{}",e.getMessage());
|
||||
resultMap.put("errMsg", "merge tempFile error");
|
||||
resultMap.put("fileDirPath", filePath);
|
||||
resultMap.put("filePath", "");
|
||||
// todo 删除文件
|
||||
return SdmResponse.failed(resultMap);
|
||||
}
|
||||
// 4. 合并完成后删除临时目录
|
||||
for (File f : tempDir.listFiles()) f.delete();
|
||||
tempDir.delete();
|
||||
resultMap.put("errMsg", "");
|
||||
resultMap.put("fileDirPath", filePath);
|
||||
// 最终文件的绝对路径
|
||||
resultMap.put("filePath", filePath +File.separator + req.getSourceFileName());
|
||||
return SdmResponse.success(resultMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseEntity<StreamingResponseBody> hpcDownloadFiles(HpcDownloadFilesReq downloadFilesReq) {
|
||||
List<String> files = downloadFilesReq.getFilesPaths();
|
||||
// files.add("F:\\hpc\\123456\\9999\\1763643241130\\spdm-profile .pptx");
|
||||
// files.add("F:\\hpc\\123456\\9999\\1763643241130\\spdm-profile - 副本.pptx");
|
||||
if (files == null || files.isEmpty()) {
|
||||
return ResponseEntity.badRequest().build();
|
||||
}
|
||||
// 单文件(直接下载)
|
||||
if (files.size() == 1) {
|
||||
String filePath = files.get(0);
|
||||
File file = new File(filePath);
|
||||
if (!file.exists()) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
StreamingResponseBody stream = outputStream ->
|
||||
{
|
||||
try {
|
||||
transferBigFile(file, outputStream);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
|
||||
headers.setContentDisposition(ContentDisposition.attachment()
|
||||
.filename(file.getName())
|
||||
.build());
|
||||
|
||||
return new ResponseEntity<>(stream, headers, HttpStatus.OK);
|
||||
}
|
||||
// 多文件(ZIP 流式下载
|
||||
StreamingResponseBody stream = outputStream -> {
|
||||
try (ZipOutputStream zipOut = new ZipOutputStream(outputStream)) {
|
||||
for (String filePath : files) {
|
||||
File file = new File(filePath);
|
||||
if (!file.exists()) continue;
|
||||
zipOut.putNextEntry(new ZipEntry(file.getName()));
|
||||
try {
|
||||
transferBigFile(file, zipOut);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
zipOut.closeEntry();
|
||||
}
|
||||
zipOut.finish();
|
||||
}
|
||||
};
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
|
||||
headers.setContentDisposition(ContentDisposition.attachment()
|
||||
// 下载文件的名字 todo
|
||||
.filename("download.zip")
|
||||
.build());
|
||||
ResponseEntity<StreamingResponseBody> fileResponse = new ResponseEntity<>(stream, headers, HttpStatus.OK);
|
||||
return fileResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用 MappedByteBuffer + 分块映射实现超大文件下载(高性能、低内存)
|
||||
*/
|
||||
private void transferBigFile(File file, OutputStream out) throws Exception {
|
||||
|
||||
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
|
||||
long fileSize = channel.size();
|
||||
long position = 0;
|
||||
|
||||
final long mapSize = 128 * 1024 * 1024; // 128MB 分块映射
|
||||
byte[] buffer = new byte[8192];
|
||||
|
||||
while (position < fileSize) {
|
||||
long size = Math.min(mapSize, fileSize - position);
|
||||
|
||||
MappedByteBuffer mbb =
|
||||
channel.map(FileChannel.MapMode.READ_ONLY, position, size);
|
||||
|
||||
while (mbb.hasRemaining()) {
|
||||
int readLen = Math.min(mbb.remaining(), buffer.length);
|
||||
mbb.get(buffer, 0, readLen);
|
||||
out.write(buffer, 0, readLen);
|
||||
}
|
||||
|
||||
position += size;
|
||||
}
|
||||
out.flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 合并文件
|
||||
*/
|
||||
private void mergeChunksWithFileChannel(File tempDir, File targetFile, int totalChunks) throws Exception {
|
||||
try (FileChannel outChannel = new FileOutputStream(targetFile, true).getChannel()) {
|
||||
for (int i = 1; i <= totalChunks; i++) {
|
||||
File partFile = new File(tempDir, i + PermConstants.CHUNK_TEMPFILE_SUFFIX);
|
||||
try (FileChannel inChannel = new FileInputStream(partFile).getChannel()) {
|
||||
long size = inChannel.size();
|
||||
long position = 0;
|
||||
// transferTo 可能不会一次传完,循环确保写满
|
||||
while (position < size) {
|
||||
long transferred = inChannel.transferTo(position, size - position, outChannel);
|
||||
position += transferred;
|
||||
}
|
||||
}
|
||||
// 删除分片文件
|
||||
partFile.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 参数校验
|
||||
*/
|
||||
private void validateRequest(HpcChunkUploadFileReq req, Long tenantId, Long userId) {
|
||||
// 基础参数校验
|
||||
Assert.notNull(tenantId, "租户ID不能为空");
|
||||
Assert.notNull(userId, "用户ID不能为空");
|
||||
Assert.hasText(req.getTaskName(), "任务名称不能为空");
|
||||
Assert.hasText(req.getSourceFileName(), "原始文件名称不能为空");
|
||||
Assert.notNull(req.getChunk(), "分片编号不能为空");
|
||||
// Assert.notNull(req.getSize(), "分片大小不能为空");
|
||||
Assert.notNull(req.getChunkTotal(), "分片总数不能为空");
|
||||
Assert.notNull(req.getFile(), "分片文件不能为空");
|
||||
Assert.isTrue(!req.getFile().isEmpty(), "分片文件不能为空");
|
||||
// Assert.notNull(req.getIsFirstReq(), "是否第一次请求不能为空");
|
||||
// 业务逻辑校验
|
||||
Assert.isTrue(req.getChunk() >= 1 && req.getChunk() <= req.getChunkTotal(),
|
||||
"分片编号非法:当前" + req.getChunk() + ",总分片" + req.getChunkTotal());
|
||||
// Assert.isTrue(req.getSize() > 0, "分片大小必须大于0");
|
||||
Assert.isTrue(req.getChunkTotal() >= 1, "分片总数必须大于等于1");
|
||||
if(req.getChunk() > NumberConstants.ONE) {
|
||||
Assert.hasText(req.getFileDirPath(), "分片父级目录不允许为空");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
server:
|
||||
port: 7103
|
||||
port: 7105
|
||||
|
||||
spring:
|
||||
application:
|
||||
@@ -24,7 +24,7 @@ spring:
|
||||
nacos:
|
||||
discovery:
|
||||
server-addr: 192.168.65.161:8848
|
||||
roup: DEV_GROUP
|
||||
roup: YANG_GROUP
|
||||
enabled: true
|
||||
namespace: 3
|
||||
# username: nacos
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
server:
|
||||
port: 7103
|
||||
port: 7105
|
||||
|
||||
spring:
|
||||
application:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
server:
|
||||
port: 7107
|
||||
port: 7105
|
||||
|
||||
spring:
|
||||
application:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
server:
|
||||
port: 7007
|
||||
port: 7105
|
||||
|
||||
|
||||
spring:
|
||||
|
||||
Reference in New Issue
Block a user