diff --git a/common/src/main/java/com/sdm/common/utils/CommonStatusUtil.java b/common/src/main/java/com/sdm/common/utils/CommonStatusUtil.java new file mode 100644 index 00000000..c0c3a6c1 --- /dev/null +++ b/common/src/main/java/com/sdm/common/utils/CommonStatusUtil.java @@ -0,0 +1,54 @@ +package com.sdm.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * 文件状态服务工具类(优化版) + * 外部直接传入拼接好的key,支持并发安全操作 + * 通用状态工具类 + */ +@Slf4j +@Component +public class CommonStatusUtil { + + /* 各种业务key维护 */ + // 文件分片上传 + public static final String CHUNK_UPLOAD_FILE = "CHUNK_UPLOAD_FILE_"; + + + // 并发安全Map,保证多线程环境下操作安全 + private final ConcurrentMap statusMap = new ConcurrentHashMap<>(); + + /** + * 开始状态:直接存入外部传入的完整key和value + * @param key 外部已拼接好的完整key + * @param value 存储的状态/业务数据 + */ + public void start(String key, Object value) { + statusMap.put(key, value); + log.info("CommonStatusUtil start:{}",key); + } + + /** + * 结束状态:根据外部传入的完整key删除数据 + * @param key 外部已拼接好的完整key + */ + public void end(String key) { + statusMap.remove(key); + log.info("CommonStatusUtil end:{}",key); + } + + /** + * 优化新增:根据完整key判断是否存在 + * @param key 外部已拼接好的完整key + * @return 存在返回true,不存在返回false + */ + public boolean exists(String key) { + return statusMap.containsKey(key); + } + +} diff --git a/common/src/main/java/com/sdm/common/utils/FilesUtil.java b/common/src/main/java/com/sdm/common/utils/FilesUtil.java index 303b51ae..a5c4e992 100644 --- a/common/src/main/java/com/sdm/common/utils/FilesUtil.java +++ b/common/src/main/java/com/sdm/common/utils/FilesUtil.java @@ -1,9 +1,12 @@ package com.sdm.common.utils; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.http.HttpStatus; import org.springframework.mock.web.MockMultipartFile; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; +import org.springframework.web.server.ResponseStatusException; import java.io.*; import java.net.HttpURLConnection; @@ -13,12 +16,10 @@ import java.nio.charset.StandardCharsets; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Vector; +import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import java.util.stream.Stream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -332,7 +333,7 @@ public class FilesUtil { String inputFilesRegularStr, AtomicReference masterFilePath, List inputFilePaths) { - log.info("求解文件目录={}", jobWorkDir); + log.info("求解文件目录={}", jobWorkDir); Objects.requireNonNull(jobWorkDir, "本地求解文件夹不能为空"); boolean hasMasterRule = masterFileRegularStr != null && !masterFileRegularStr.isBlank(); @@ -695,4 +696,140 @@ public class FilesUtil { } } + /** + * 通用分片上传到本地磁盘 + 最后一片自动合并 + * @param absoluteFilePath 基础根目录 + * @param fileId 分片ID(唯一标识) + * @param chunkIndex 当前分片下标(从0开始) + * @param totalChunks 总分片数 + * @param filename 合并后的最终文件名 + * @param inputStream 文件流 + * @return 结果标识:CHUNK_UPLOAD_SUCCESS / UPLOAD_AND_MERGE_SUCCESS:最终路径 + * @throws IOException IO异常 + */ + public static String handleChunkUploadLocalAndAutoMerge( + String absoluteFilePath, + String fileId, + int chunkIndex, + int totalChunks, + String filename, + InputStream inputStream + ) throws Exception { + // 1. 标准化路径,防路径穿越 + Path rootDir = Paths.get(absoluteFilePath).toAbsolutePath().normalize(); + Path chunkDir = rootDir.resolve(fileId); + Path chunkPath = chunkDir.resolve(chunkIndex + ".temp"); + + // 2. 创建分片目录并写入分片 + Files.createDirectories(chunkDir); + try (InputStream in = inputStream; + OutputStream out = Files.newOutputStream(chunkPath, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE)) { + in.transferTo(out); + } + + // 3. 不是最后一片,直接返回 + if (chunkIndex != totalChunks - 1) { + return "CHUNK_UPLOAD_SUCCESS:" + chunkIndex; + } + + // 4. 最后一片:自动合并所有分片 + Path targetFile = rootDir.resolve(filename); + try (OutputStream out = Files.newOutputStream(targetFile, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE)) { + for (int i = 0; i < totalChunks; i++) { + Path chunk = chunkDir.resolve(i + ".temp"); + if (!Files.exists(chunk)) { + throw new ResponseStatusException(HttpStatus.BAD_REQUEST, + "分片缺失:" + i + ".temp,合并失败"); + } + try (InputStream in = Files.newInputStream(chunk)) { + in.transferTo(out); + } + } + } + + // 5. 合并完成,删除分片 + Files.walk(chunkDir) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { Files.delete(path); } catch (Exception ignored) { + log.warn("handleChunkUploadLocalAndAutoMerge delete temp error:{}", ignored.getMessage()); + } + }); + return "UPLOAD_AND_MERGE_SUCCESS:" + targetFile; + } + + /** + * 获取主文件和从文件的绝对路径 + * @param basePath 基础路径(支持 Linux:/hpc/xxx 和 Windows:D:/hpc/xxx 格式) + * @return Pair<主文件绝对路径, 从文件绝对路径集合> + * @throws IOException 文件读取异常/路径不存在 + * @throws IllegalArgumentException 基础路径不存在/无主文件 + */ + public static Pair> getSingleSubmitFiles(String basePath) { + // 1. 标准化路径 + Path baseDir = Paths.get(basePath); + // 基础校验 + if (!Files.exists(baseDir)) { + throw new IllegalArgumentException("基础路径不存在:" + basePath); + } + if (!Files.isDirectory(baseDir)) { + throw new IllegalArgumentException("基础路径不是文件夹:" + basePath); + } + String mainFileAbsolutePath = null; + List slaveFileList = new ArrayList<>(); + // 2. 遍历基础目录下直接子项 + try { + try (Stream children = Files.list(baseDir)) { + for (Path path : children.toList()) { + // 找第一个文件 → 主文件 + if (mainFileAbsolutePath == null && Files.isRegularFile(path)) { + mainFileAbsolutePath = path.toAbsolutePath().toString(); + } + // 找第一个文件夹 → 递归遍历里面所有文件(所有层级) + if (slaveFileList.isEmpty() && Files.isDirectory(path)) { + listAllFilesRecursively(path, slaveFileList); + } + // 找到主文件 + 第一个文件夹遍历完成 → 提前退出 + if (mainFileAbsolutePath != null && !slaveFileList.isEmpty()) { + break; + } + } + } + } catch (Exception e) { + log.error("getSingleSubmitFiles get files error:{}",e.getMessage()); + throw new RuntimeException("获取HPC文件异常"); + } + // 必须有主文件 + if (mainFileAbsolutePath == null) { + throw new IllegalArgumentException("路径下未找到任何主文件:" + basePath); + } + return Pair.of(mainFileAbsolutePath, slaveFileList); + } + + /** + * 递归工具方法:遍历文件夹下所有层级的文件 + * @param dir 要遍历的目录 + * @param fileList 收集文件绝对路径的集合 + * @throws IOException IO异常 + */ + private static void listAllFilesRecursively(Path dir, List fileList) throws IOException { + try (Stream stream = Files.list(dir)) { + for (Path path : stream.toList()) { + if (Files.isRegularFile(path)) { + // 是文件 → 加入列表 + fileList.add(path.toAbsolutePath().toString()); + } else if (Files.isDirectory(path)) { + // 是文件夹 → 递归继续找 + listAllFilesRecursively(path, fileList); + } + } + } + } + + + } diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index dfa8d3f4..fec48281 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -200,7 +200,8 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { @Autowired private ExportOperate exportOperate; - + @Autowired + private CommonStatusUtil commonStatusUtil; private static final String TEMP_FILE_PATH = "/usr/local/nginx/html/storage/"; @@ -1995,6 +1996,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { } return buildSuccessResponse(resp,req,""); } + // 分片的存储的状态标识 + if(!commonStatusUtil.exists(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId())){ + commonStatusUtil.start(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId(),"doing"); + } // 碎片目录 String tempDirPath = org.apache.commons.lang3.StringUtils.isBlank(req.getFileTempPath())? filePath +"temp_"+timestamp+"/":req.getFileTempPath(); @@ -2016,6 +2021,8 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { } if(!b){ + log.warn("chunkUpload第:{}次失败,分片文件数据开始删除",req.getChunk()); + commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId()); deleteTempFileAfterFailed(tempDirPath,chunkBucket); return buildFailedResponse(resp,"chunkUpload第"+req.getChunk()+"次失败",req); } @@ -2023,10 +2030,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { if (req.getChunk() < req.getChunkTotal()) { return buildSuccessResponse(resp,req,tempDirPath); } - // 3. 全部分片已经上传 => 自动合并 - String finalFileName = req.getObjectKey(); - Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); - + // 文件已经上传完成 // 是否本地保存 if(Objects.equals("Y",req.getIsSaveLocal())){ String finalLocalFilePath = absoluteLocalFileDirPath+req.getSourceFileName(); @@ -2039,14 +2043,26 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { log.info("saveChunkFileToLocal 合并本地文件finalLocalFilePath:{},localTempDirPath:{},getChunkTotal:{}结果:{}", finalLocalFilePath,localTempDirPath,req.getChunkTotal(),localMergeSuccess); } + // 分片文件已经上传完成,提前返回,异步合并数据 + CompletableFuture.runAsync(() -> { + asyncMerge(req.getObjectKey(),chunkBucket,tempDirPath,req.getBusinessId()); + }); + return buildSuccessResponse(resp,req,tempDirPath); + } + + // 异步合并 + private void asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId){ + Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); if(!merge){ deleteTempFileAfterFailed(tempDirPath,chunkBucket); - return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req); + log.error("asyncMerge 合并分片失败:{}",tempDirPath); +// return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req); } // 4. 合并完成后删除临时目录 deleteTempFileAfterFailed(tempDirPath,chunkBucket); - return buildSuccessResponse(resp,req,tempDirPath); + // 删除分片上传的处理中状态 + commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId); }