This commit is contained in:
2026-03-24 14:59:19 +08:00
3 changed files with 219 additions and 12 deletions

View File

@@ -0,0 +1,54 @@
package com.sdm.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 文件状态服务工具类(优化版)
* 外部直接传入拼接好的key支持并发安全操作
* 通用状态工具类
*/
@Slf4j
@Component
public class CommonStatusUtil {
/* 各种业务key维护 */
// 文件分片上传
public static final String CHUNK_UPLOAD_FILE = "CHUNK_UPLOAD_FILE_";
// 并发安全Map保证多线程环境下操作安全
private final ConcurrentMap<String, Object> statusMap = new ConcurrentHashMap<>();
/**
* 开始状态直接存入外部传入的完整key和value
* @param key 外部已拼接好的完整key
* @param value 存储的状态/业务数据
*/
public void start(String key, Object value) {
statusMap.put(key, value);
log.info("CommonStatusUtil start:{}",key);
}
/**
* 结束状态根据外部传入的完整key删除数据
* @param key 外部已拼接好的完整key
*/
public void end(String key) {
statusMap.remove(key);
log.info("CommonStatusUtil end:{}",key);
}
/**
* 优化新增根据完整key判断是否存在
* @param key 外部已拼接好的完整key
* @return 存在返回true不存在返回false
*/
public boolean exists(String key) {
return statusMap.containsKey(key);
}
}

View File

@@ -1,9 +1,12 @@
package com.sdm.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.server.ResponseStatusException;
import java.io.*;
import java.net.HttpURLConnection;
@@ -13,12 +16,10 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Vector;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -332,7 +333,7 @@ public class FilesUtil {
String inputFilesRegularStr,
AtomicReference<String> masterFilePath,
List<String> inputFilePaths) {
log.info("求解文件目录={}", jobWorkDir);
log.info("求解文件目录={}", jobWorkDir);
Objects.requireNonNull(jobWorkDir, "本地求解文件夹不能为空");
boolean hasMasterRule =
masterFileRegularStr != null && !masterFileRegularStr.isBlank();
@@ -695,4 +696,140 @@ public class FilesUtil {
}
}
/**
* 通用分片上传到本地磁盘 + 最后一片自动合并
* @param absoluteFilePath 基础根目录
* @param fileId 分片ID唯一标识
* @param chunkIndex 当前分片下标从0开始
* @param totalChunks 总分片数
* @param filename 合并后的最终文件名
* @param inputStream 文件流
* @return 结果标识CHUNK_UPLOAD_SUCCESS / UPLOAD_AND_MERGE_SUCCESS:最终路径
* @throws IOException IO异常
*/
public static String handleChunkUploadLocalAndAutoMerge(
String absoluteFilePath,
String fileId,
int chunkIndex,
int totalChunks,
String filename,
InputStream inputStream
) throws Exception {
// 1. 标准化路径,防路径穿越
Path rootDir = Paths.get(absoluteFilePath).toAbsolutePath().normalize();
Path chunkDir = rootDir.resolve(fileId);
Path chunkPath = chunkDir.resolve(chunkIndex + ".temp");
// 2. 创建分片目录并写入分片
Files.createDirectories(chunkDir);
try (InputStream in = inputStream;
OutputStream out = Files.newOutputStream(chunkPath,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)) {
in.transferTo(out);
}
// 3. 不是最后一片,直接返回
if (chunkIndex != totalChunks - 1) {
return "CHUNK_UPLOAD_SUCCESS:" + chunkIndex;
}
// 4. 最后一片:自动合并所有分片
Path targetFile = rootDir.resolve(filename);
try (OutputStream out = Files.newOutputStream(targetFile,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)) {
for (int i = 0; i < totalChunks; i++) {
Path chunk = chunkDir.resolve(i + ".temp");
if (!Files.exists(chunk)) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"分片缺失:" + i + ".temp合并失败");
}
try (InputStream in = Files.newInputStream(chunk)) {
in.transferTo(out);
}
}
}
// 5. 合并完成,删除分片
Files.walk(chunkDir)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try { Files.delete(path); } catch (Exception ignored) {
log.warn("handleChunkUploadLocalAndAutoMerge delete temp error:{}", ignored.getMessage());
}
});
return "UPLOAD_AND_MERGE_SUCCESS:" + targetFile;
}
/**
* 获取主文件和从文件的绝对路径
* @param basePath 基础路径(支持 Linux:/hpc/xxx 和 Windows:D:/hpc/xxx 格式)
* @return Pair<主文件绝对路径, 从文件绝对路径集合>
* @throws IOException 文件读取异常/路径不存在
* @throws IllegalArgumentException 基础路径不存在/无主文件
*/
public static Pair<String, List<String>> getSingleSubmitFiles(String basePath) {
// 1. 标准化路径
Path baseDir = Paths.get(basePath);
// 基础校验
if (!Files.exists(baseDir)) {
throw new IllegalArgumentException("基础路径不存在:" + basePath);
}
if (!Files.isDirectory(baseDir)) {
throw new IllegalArgumentException("基础路径不是文件夹:" + basePath);
}
String mainFileAbsolutePath = null;
List<String> slaveFileList = new ArrayList<>();
// 2. 遍历基础目录下直接子项
try {
try (Stream<Path> children = Files.list(baseDir)) {
for (Path path : children.toList()) {
// 找第一个文件 → 主文件
if (mainFileAbsolutePath == null && Files.isRegularFile(path)) {
mainFileAbsolutePath = path.toAbsolutePath().toString();
}
// 找第一个文件夹 → 递归遍历里面所有文件(所有层级)
if (slaveFileList.isEmpty() && Files.isDirectory(path)) {
listAllFilesRecursively(path, slaveFileList);
}
// 找到主文件 + 第一个文件夹遍历完成 → 提前退出
if (mainFileAbsolutePath != null && !slaveFileList.isEmpty()) {
break;
}
}
}
} catch (Exception e) {
log.error("getSingleSubmitFiles get files error:{}",e.getMessage());
throw new RuntimeException("获取HPC文件异常");
}
// 必须有主文件
if (mainFileAbsolutePath == null) {
throw new IllegalArgumentException("路径下未找到任何主文件:" + basePath);
}
return Pair.of(mainFileAbsolutePath, slaveFileList);
}
/**
* 递归工具方法:遍历文件夹下所有层级的文件
* @param dir 要遍历的目录
* @param fileList 收集文件绝对路径的集合
* @throws IOException IO异常
*/
private static void listAllFilesRecursively(Path dir, List<String> fileList) throws IOException {
try (Stream<Path> stream = Files.list(dir)) {
for (Path path : stream.toList()) {
if (Files.isRegularFile(path)) {
// 是文件 → 加入列表
fileList.add(path.toAbsolutePath().toString());
} else if (Files.isDirectory(path)) {
// 是文件夹 → 递归继续找
listAllFilesRecursively(path, fileList);
}
}
}
}
}

View File

@@ -200,7 +200,8 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
@Autowired
private ExportOperate exportOperate;
@Autowired
private CommonStatusUtil commonStatusUtil;
private static final String TEMP_FILE_PATH = "/usr/local/nginx/html/storage/";
@@ -1995,6 +1996,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
}
return buildSuccessResponse(resp,req,"");
}
// 分片的存储的状态标识
if(!commonStatusUtil.exists(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId())){
commonStatusUtil.start(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId(),"doing");
}
// 碎片目录
String tempDirPath = org.apache.commons.lang3.StringUtils.isBlank(req.getFileTempPath())?
filePath +"temp_"+timestamp+"/":req.getFileTempPath();
@@ -2016,6 +2021,8 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
}
if(!b){
log.warn("chunkUpload第:{}次失败,分片文件数据开始删除",req.getChunk());
commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId());
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
return buildFailedResponse(resp,"chunkUpload第"+req.getChunk()+"次失败",req);
}
@@ -2023,10 +2030,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
if (req.getChunk() < req.getChunkTotal()) {
return buildSuccessResponse(resp,req,tempDirPath);
}
// 3. 全部分片已经上传 => 自动合并
String finalFileName = req.getObjectKey();
Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName);
// 文件已经上传完成
// 是否本地保存
if(Objects.equals("Y",req.getIsSaveLocal())){
String finalLocalFilePath = absoluteLocalFileDirPath+req.getSourceFileName();
@@ -2039,14 +2043,26 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
log.info("saveChunkFileToLocal 合并本地文件finalLocalFilePath:{},localTempDirPath:{},getChunkTotal:{}结果:{}",
finalLocalFilePath,localTempDirPath,req.getChunkTotal(),localMergeSuccess);
}
// 分片文件已经上传完成,提前返回,异步合并数据
CompletableFuture.runAsync(() -> {
asyncMerge(req.getObjectKey(),chunkBucket,tempDirPath,req.getBusinessId());
});
return buildSuccessResponse(resp,req,tempDirPath);
}
// 异步合并
private void asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId){
Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName);
if(!merge){
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req);
log.error("asyncMerge 合并分片失败:{}",tempDirPath);
// return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req);
}
// 4. 合并完成后删除临时目录
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
return buildSuccessResponse(resp,req,tempDirPath);
// 删除分片上传的处理中状态
commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId);
}