修改:pbs单独提交,分片上传文件到本地磁盘,异步通知结果场景优化

This commit is contained in:
2026-04-17 17:55:16 +08:00
parent 213463458c
commit eddf5ac177
5 changed files with 231 additions and 90 deletions

View File

@@ -50,6 +50,15 @@ spring:
- StripPrefix=2
metadata:
group: LYRIC_GROUP # 指定目标服务的分组
- id: pbs-service-chunkUploadToLocal
uri: lb://pbs
predicates:
- Path=/simulation/pbs/pbs/chunkUploadToLocal
filters:
- StripPrefix=2
metadata:
response-timeout: 60000
connect-timeout: 10000
- id: pbs-service-queryHpcResource
uri: lb://pbs
predicates:

View File

@@ -45,7 +45,7 @@ public class PbsFileController {
public SdmResponse<PbsChunkUploadLocalFileResp> chunkUpload(PbsChunkUploadLocalFileReq req) {
// 1. 基础参数校验
Long userId = ThreadLocalContext.getUserId();
if (userId== null || req.getJobName() == null || req.getUuid() == null ||
if (req.getBusinessId() == null ||userId== null || req.getJobName() == null || req.getUuid() == null ||
req.getSourceFileName() == null || req.getChunk() == null || req.getChunkTotal() == null || req.getFile() == null) {
return SdmResponse.failed("必填参数不能为空");
}

View File

@@ -5,7 +5,16 @@ import lombok.Data;
@Data
public class PbsChunkUploadLocalFileResp {
// 本次分片上传的结果 true 成功false 失败
private Boolean result=true;
// 文件关联的业务数据Id
private Long businessId;
// 上传的最终路径,如果是文件则是服务器文件路径,如果是文件夹则是文件夹的最终路径--用于发起任务的参数
private String finalFullFilePath;
// 是否是异步返回,默认N 同步返回; 只有最后一片超级大文件合并超时的时候走异步字段返回Y返回Y的时候前端就要监听webscoket的消息数据后端通过webscoket通知已经合并上传完成。
private String asyncBack="N";
}

View File

@@ -7,6 +7,8 @@ import org.springframework.web.multipart.MultipartFile;
public class PbsChunkUploadLocalFileReq {
// 任务的名称
private String jobName;
// 文件的业务id,不同文件不同
private Long businessId;
// 前端生成时间戳,单次任务的请求相同,用于区分目录,及后端提交的凭证
private String uuid;
// 用户选择文件夹的名字,单文件不传

View File

@@ -1,113 +1,198 @@
package com.sdm.pbs.service.impl;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.ThreadLocalContext;
import com.sdm.common.common.WsMessage;
import com.sdm.common.common.WsSceneEnum;
import com.sdm.common.feign.inter.system.IWsPushToolFeignClient;
import com.sdm.common.utils.MdcUtil;
import com.sdm.pbs.model.bo.PbsChunkUploadLocalFileResp;
import com.sdm.pbs.model.req.PbsChunkUploadLocalFileReq;
import com.sdm.pbs.service.IPbsFileUploadService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
@Slf4j
@Service
@RequiredArgsConstructor
public class PbsFileUploadServiceImpl implements IPbsFileUploadService {
private static final int BUFFER_SIZE = 1024 * 1024;
private static final String ASYNC_BACK_YES = "Y";
private static final String CHUNK_FILE_SPLIT = "\\.";
private static final Pattern NUMBER_PATTERN = Pattern.compile("^\\d+$");
@Value("${localMerge.wait:55}")
private Long localMergerWait;
private final IWsPushToolFeignClient wsPushToolFeignClient;
@Override
public SdmResponse<PbsChunkUploadLocalFileResp> handleChunkUpload(PbsChunkUploadLocalFileReq req, String basePath,String basePathParent) {
try {
PbsChunkUploadLocalFileResp pbsChunkUploadLocalFileResp = dealFile(req, basePath,basePathParent);
return SdmResponse.success(pbsChunkUploadLocalFileResp);
} catch (Exception e) {
log.error("pbs handleChunkUpload error:{}",e.getMessage());
public SdmResponse<PbsChunkUploadLocalFileResp> handleChunkUpload(PbsChunkUploadLocalFileReq req, String basePath, String basePathParent) {
if (Objects.isNull(req) || Objects.isNull(req.getFile()) || req.getFile().isEmpty()) {
log.error("分片上传参数异常文件为空businessId:{}", req.getBusinessId());
return buildFailedResponse(new PbsChunkUploadLocalFileResp(), req.getBusinessId());
}
try {
// ==============================================
// 所有分片写入 都在 主线程 执行(绝对安全)
// ==============================================
PbsChunkUploadLocalFileResp resp = dealFile(req, basePath);
resp.setBusinessId(req.getBusinessId());
// 不是最后一片 → 直接返回
if (!req.getChunk().equals(req.getChunkTotal())) {
resp.setResult(true);
return SdmResponse.success(resp);
}
// ==============================================
// 最后一片:开始合并(主线程先写入完成,再异步合并)
// ==============================================
long startTime = System.currentTimeMillis();
String traceId = MdcUtil.getTraceId();
Long userId = ThreadLocalContext.getUserId();
Long tenantId = ThreadLocalContext.getTenantId();
CompletableFuture<PbsChunkUploadLocalFileResp> future = CompletableFuture.supplyAsync(
() -> doMerge(req, basePath, basePathParent, startTime, traceId, userId, tenantId));
try {
// 等待合并(不超时 → 同步返回)
resp = future.get(localMergerWait, TimeUnit.SECONDS);
log.info("[同步合并完成] businessId:{}", req.getBusinessId());
return SdmResponse.success(resp);
} catch (TimeoutException e) {
// 超时 → 立即返回,后台继续合并,最后 WS 通知
log.warn("[同步超时,后台异步合并] businessId:{}", req.getBusinessId());
resp.setAsyncBack(ASYNC_BACK_YES);
resp.setResult(true);
return SdmResponse.success(resp);
} catch (Exception e) {
log.error("[合并失败] businessId:{}", req.getBusinessId(), e);
return buildFailedResponse(resp, req.getBusinessId());
}
} catch (Exception e) {
log.error("[分片处理异常] businessId:{}", req.getBusinessId(), e);
return buildFailedResponse(new PbsChunkUploadLocalFileResp(), req.getBusinessId());
}
return SdmResponse.failed("处理上传文件失败");
}
private PbsChunkUploadLocalFileResp dealFile(PbsChunkUploadLocalFileReq req, String basePath,String basePathParent) throws Exception {
/**
* 真正合并逻辑(异步)
*/
private PbsChunkUploadLocalFileResp doMerge(PbsChunkUploadLocalFileReq req, String basePath, String basePathParent,
long startTime, String traceId, Long userId, Long tenantId) {
MdcUtil.putTraceId(traceId);
PbsChunkUploadLocalFileResp resp = new PbsChunkUploadLocalFileResp();
// 1. 创建基础目录
File baseDir = new File(basePath);
if (!baseDir.exists()) {
baseDir.mkdirs();
}
// 2. 生成临时分片目录(去掉文件后缀)
// 2. 生成临时目录名(兼容无后缀文件)
String sourceFileName = req.getSourceFileName();
String tempDirName;
int lastDotIndex = sourceFileName.lastIndexOf(".");
if (lastDotIndex > 0) {
tempDirName = sourceFileName.substring(0, lastDotIndex);
} else {
tempDirName = sourceFileName; // 无后缀文件直接用文件名
}
String tempDirPath = basePath + File.separator + tempDirName;
File tempDir = new File(tempDirPath);
// // 3. 非首片上传:使用前端传入的临时目录
// if (req.getFileTempPath() != null && !req.getFileTempPath().trim().isEmpty()) {
// tempDirPath = req.getFileTempPath();
// tempDir = new File(tempDirPath);
// }
// 4. 创建临时分片目录
if (!tempDir.exists()) {
tempDir.mkdirs();
}
// 5. 写入当前分片文件
File chunkFile = new File(tempDir, req.getSourceFileName() + "." + req.getChunk());
try (InputStream in = req.getFile().getInputStream();
OutputStream out = new FileOutputStream(chunkFile)) {
byte[] buffer = new byte[1024 * 1024];
int len;
while ((len = in.read(buffer)) != -1) {
out.write(buffer, 0, len);
resp.setBusinessId(req.getBusinessId());
try {
// 执行最终合并
resp = mergeFinalFile(req, basePath, basePathParent);
resp.setResult(true);
} catch (Exception e) {
log.error("[异步合并异常] businessId:{}", req.getBusinessId(), e);
resp.setResult(false);
} finally {
long cost = (System.currentTimeMillis() - startTime) / 1000;
if (cost >= localMergerWait) {
resp.setAsyncBack(ASYNC_BACK_YES);
resp.setBusinessId(req.getBusinessId());
sendWsMessage(userId, tenantId, resp);
}
MdcUtil.removeTraceId();
}
log.info("分片{}写入完成:{}", req.getChunk(), chunkFile.getAbsolutePath());
// 6. 判断是否为最后一片,执行合并
if (req.getChunk().equals(req.getChunkTotal())) {
String targetFilePath = basePath + File.separator + req.getSourceFileName();
mergeChunkFiles(tempDir, new File(targetFilePath));
// 合并完成删除临时目录
deleteDirectory(tempDir);
log.info("文件合并完成,临时目录已删除:{}", targetFilePath);
// 上传的文件夹的后端磁盘路径
if(StringUtils.isNotBlank(basePathParent)){
resp.setFinalFullFilePath(basePathParent);
}else {
// 单一文件的后端路径
resp.setFinalFullFilePath(targetFilePath);
}
return resp;
}
// 7. 非最后一片:返回临时目录给前端后续使用
return resp;
}
/**
* 主线程执行:分片写入(所有分片都走这里,绝对安全)
*/
private PbsChunkUploadLocalFileResp dealFile(PbsChunkUploadLocalFileReq req, String basePath) throws Exception {
PbsChunkUploadLocalFileResp resp = new PbsChunkUploadLocalFileResp();
MultipartFile file = req.getFile();
String sourceFileName = req.getSourceFileName();
File baseDir = new File(basePath);
if (!baseDir.exists() && !baseDir.mkdirs()) throw new IOException("创建基础目录失败");
String tempDirName = getTempDirName(sourceFileName);
File tempDir = new File(baseDir, tempDirName);
if (!tempDir.exists() && !tempDir.mkdirs()) throw new IOException("创建临时目录失败");
// 写入分片(主线程)
File chunkFile = new File(tempDir, sourceFileName + "." + req.getChunk());
writeChunkToFile(file, chunkFile);
log.info("[分片写入] chunk:{} 完成", req.getChunk());
return resp;
}
/**
* 合并所有分片文件
* 最后一片:合并文件
*/
private void mergeChunkFiles(File tempDir, File targetFile) throws IOException {
try (FileChannel outChannel = new FileOutputStream(targetFile).getChannel()) {
File[] chunkFiles = tempDir.listFiles((dir, name) -> name.contains("."));
if (chunkFiles == null || chunkFiles.length == 0) {
throw new FileNotFoundException("分片文件不存在");
private PbsChunkUploadLocalFileResp mergeFinalFile(PbsChunkUploadLocalFileReq req, String basePath, String basePathParent) throws Exception {
PbsChunkUploadLocalFileResp resp = new PbsChunkUploadLocalFileResp();
String sourceFileName = req.getSourceFileName();
File baseDir = new File(basePath);
String tempDirName = getTempDirName(sourceFileName);
File tempDir = new File(baseDir, tempDirName);
String targetFilePath = new File(baseDir, sourceFileName).getAbsolutePath();
// 合并分片
mergeChunkFiles(tempDir, new File(targetFilePath));
deleteDirectoryQuietly(tempDir);
resp.setFinalFullFilePath(StringUtils.isNotBlank(basePathParent) ? basePathParent : targetFilePath);
return resp;
}
// ==================== 工具方法 ====================
private String getTempDirName(String fileName) {
int lastDotIndex = fileName.lastIndexOf(".");
return lastDotIndex > 0 ? fileName.substring(0, lastDotIndex) : fileName;
}
private void writeChunkToFile(MultipartFile file, File chunkFile) throws IOException {
try (InputStream in = file.getInputStream();
BufferedInputStream bin = new BufferedInputStream(in, BUFFER_SIZE);
BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(chunkFile), BUFFER_SIZE)) {
byte[] buffer = new byte[BUFFER_SIZE];
int len;
while ((len = bin.read(buffer)) != -1) {
out.write(buffer, 0, len);
}
out.flush();
}
}
// 按分片序号排序
java.util.Arrays.sort(chunkFiles, (f1, f2) -> {
int c1 = Integer.parseInt(f1.getName().split("\\.")[1]);
int c2 = Integer.parseInt(f2.getName().split("\\.")[1]);
return Integer.compare(c1, c2);
});
private void mergeChunkFiles(File tempDir, File targetFile) throws IOException {
// 不过滤,直接拿目录下所有文件(最安全)
File[] chunkFiles = tempDir.listFiles();
if (chunkFiles == null || chunkFiles.length == 0) {
throw new FileNotFoundException("无分片文件:" + tempDir.getAbsolutePath());
}
Arrays.sort(chunkFiles, Comparator.comparingInt(this::extractChunkNumber));
// 合并分片
try (FileChannel outChannel = new FileOutputStream(targetFile).getChannel()) {
for (File chunk : chunkFiles) {
try (FileChannel inChannel = new FileInputStream(chunk).getChannel()) {
inChannel.transferTo(0, inChannel.size(), outChannel);
@@ -118,18 +203,54 @@ public class PbsFileUploadServiceImpl implements IPbsFileUploadService {
}
}
/**
* 删除目录及所有文件
*/
private void deleteDirectory(File dir) throws IOException {
if (dir.isDirectory()) {
File[] children = dir.listFiles();
if (children != null) {
for (File child : children) {
deleteDirectory(child);
}
}
private int extractChunkNumber(File file) {
try {
String[] parts = file.getName().split(CHUNK_FILE_SPLIT);
String numStr = parts[parts.length - 1];
return NUMBER_PATTERN.matcher(numStr).matches() ? Integer.parseInt(numStr) : 0;
} catch (Exception e) {
return 0;
}
Files.deleteIfExists(dir.toPath());
}
}
private void deleteDirectoryQuietly(File dir) {
try {
if (dir.isDirectory()) {
File[] files = dir.listFiles();
if (files != null) for (File f : files) deleteDirectoryQuietly(f);
}
Files.deleteIfExists(dir.toPath());
} catch (Exception ignored) {}
}
private void sendWsMessage(Long userId, Long tenantId, PbsChunkUploadLocalFileResp resp) {
try {
ThreadLocalContext.setUserId(userId);
ThreadLocalContext.setTenantId(tenantId);
WsMessage<PbsChunkUploadLocalFileResp> message = new WsMessage<>();
message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene());
message.setUserId(userId);
message.setTimestamp(System.currentTimeMillis());
message.setData(resp);
wsPushToolFeignClient.wsPushOne(message);
log.info("[WS通知完成] businessId:{}", resp.getBusinessId());
} catch (Exception e) {
log.error("[WS通知失败] businessId:{}", resp.getBusinessId(), e);
}
}
private SdmResponse<PbsChunkUploadLocalFileResp> buildSuccessResponse(PbsChunkUploadLocalFileResp resp, Long businessId, String finalPath) {
resp.setBusinessId(businessId);
resp.setResult(true);
resp.setFinalFullFilePath(finalPath);
return SdmResponse.success(resp);
}
private SdmResponse<PbsChunkUploadLocalFileResp> buildFailedResponse(PbsChunkUploadLocalFileResp resp, Long businessId) {
resp.setBusinessId(businessId);
resp.setResult(false);
return SdmResponse.failed(resp);
}
}