新增:data文件分批上传,异步合并逻辑增加

This commit is contained in:
2026-03-24 14:48:08 +08:00
parent 65fbeef457
commit f7ad27f453
3 changed files with 219 additions and 12 deletions

View File

@@ -0,0 +1,54 @@
package com.sdm.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 文件状态服务工具类(优化版)
* 外部直接传入拼接好的key支持并发安全操作
* 通用状态工具类
*/
@Slf4j
@Component
public class CommonStatusUtil {
/* 各种业务key维护 */
// 文件分片上传
public static final String CHUNK_UPLOAD_FILE = "CHUNK_UPLOAD_FILE_";
// 并发安全Map保证多线程环境下操作安全
private final ConcurrentMap<String, Object> statusMap = new ConcurrentHashMap<>();
/**
* 开始状态直接存入外部传入的完整key和value
* @param key 外部已拼接好的完整key
* @param value 存储的状态/业务数据
*/
public void start(String key, Object value) {
statusMap.put(key, value);
log.info("CommonStatusUtil start:{}",key);
}
/**
* 结束状态根据外部传入的完整key删除数据
* @param key 外部已拼接好的完整key
*/
public void end(String key) {
statusMap.remove(key);
log.info("CommonStatusUtil end:{}",key);
}
/**
* 优化新增根据完整key判断是否存在
* @param key 外部已拼接好的完整key
* @return 存在返回true不存在返回false
*/
public boolean exists(String key) {
return statusMap.containsKey(key);
}
}

View File

@@ -1,9 +1,12 @@
package com.sdm.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.server.ResponseStatusException;
import java.io.*;
import java.net.HttpURLConnection;
@@ -13,12 +16,10 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Vector;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -332,7 +333,7 @@ public class FilesUtil {
String inputFilesRegularStr,
AtomicReference<String> masterFilePath,
List<String> inputFilePaths) {
log.info("求解文件目录={}", jobWorkDir);
log.info("求解文件目录={}", jobWorkDir);
Objects.requireNonNull(jobWorkDir, "本地求解文件夹不能为空");
boolean hasMasterRule =
masterFileRegularStr != null && !masterFileRegularStr.isBlank();
@@ -695,4 +696,140 @@ public class FilesUtil {
}
}
/**
* 通用分片上传到本地磁盘 + 最后一片自动合并
* @param absoluteFilePath 基础根目录
* @param fileId 分片ID唯一标识
* @param chunkIndex 当前分片下标从0开始
* @param totalChunks 总分片数
* @param filename 合并后的最终文件名
* @param inputStream 文件流
* @return 结果标识CHUNK_UPLOAD_SUCCESS / UPLOAD_AND_MERGE_SUCCESS:最终路径
* @throws IOException IO异常
*/
public static String handleChunkUploadLocalAndAutoMerge(
String absoluteFilePath,
String fileId,
int chunkIndex,
int totalChunks,
String filename,
InputStream inputStream
) throws Exception {
// 1. 标准化路径,防路径穿越
Path rootDir = Paths.get(absoluteFilePath).toAbsolutePath().normalize();
Path chunkDir = rootDir.resolve(fileId);
Path chunkPath = chunkDir.resolve(chunkIndex + ".temp");
// 2. 创建分片目录并写入分片
Files.createDirectories(chunkDir);
try (InputStream in = inputStream;
OutputStream out = Files.newOutputStream(chunkPath,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)) {
in.transferTo(out);
}
// 3. 不是最后一片,直接返回
if (chunkIndex != totalChunks - 1) {
return "CHUNK_UPLOAD_SUCCESS:" + chunkIndex;
}
// 4. 最后一片:自动合并所有分片
Path targetFile = rootDir.resolve(filename);
try (OutputStream out = Files.newOutputStream(targetFile,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)) {
for (int i = 0; i < totalChunks; i++) {
Path chunk = chunkDir.resolve(i + ".temp");
if (!Files.exists(chunk)) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"分片缺失:" + i + ".temp合并失败");
}
try (InputStream in = Files.newInputStream(chunk)) {
in.transferTo(out);
}
}
}
// 5. 合并完成,删除分片
Files.walk(chunkDir)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try { Files.delete(path); } catch (Exception ignored) {
log.warn("handleChunkUploadLocalAndAutoMerge delete temp error:{}", ignored.getMessage());
}
});
return "UPLOAD_AND_MERGE_SUCCESS:" + targetFile;
}
/**
* 获取主文件和从文件的绝对路径
* @param basePath 基础路径(支持 Linux:/hpc/xxx 和 Windows:D:/hpc/xxx 格式)
* @return Pair<主文件绝对路径, 从文件绝对路径集合>
* @throws IOException 文件读取异常/路径不存在
* @throws IllegalArgumentException 基础路径不存在/无主文件
*/
public static Pair<String, List<String>> getSingleSubmitFiles(String basePath) {
// 1. 标准化路径
Path baseDir = Paths.get(basePath);
// 基础校验
if (!Files.exists(baseDir)) {
throw new IllegalArgumentException("基础路径不存在:" + basePath);
}
if (!Files.isDirectory(baseDir)) {
throw new IllegalArgumentException("基础路径不是文件夹:" + basePath);
}
String mainFileAbsolutePath = null;
List<String> slaveFileList = new ArrayList<>();
// 2. 遍历基础目录下直接子项
try {
try (Stream<Path> children = Files.list(baseDir)) {
for (Path path : children.toList()) {
// 找第一个文件 → 主文件
if (mainFileAbsolutePath == null && Files.isRegularFile(path)) {
mainFileAbsolutePath = path.toAbsolutePath().toString();
}
// 找第一个文件夹 → 递归遍历里面所有文件(所有层级)
if (slaveFileList.isEmpty() && Files.isDirectory(path)) {
listAllFilesRecursively(path, slaveFileList);
}
// 找到主文件 + 第一个文件夹遍历完成 → 提前退出
if (mainFileAbsolutePath != null && !slaveFileList.isEmpty()) {
break;
}
}
}
} catch (Exception e) {
log.error("getSingleSubmitFiles get files error:{}",e.getMessage());
throw new RuntimeException("获取HPC文件异常");
}
// 必须有主文件
if (mainFileAbsolutePath == null) {
throw new IllegalArgumentException("路径下未找到任何主文件:" + basePath);
}
return Pair.of(mainFileAbsolutePath, slaveFileList);
}
/**
* 递归工具方法:遍历文件夹下所有层级的文件
* @param dir 要遍历的目录
* @param fileList 收集文件绝对路径的集合
* @throws IOException IO异常
*/
private static void listAllFilesRecursively(Path dir, List<String> fileList) throws IOException {
try (Stream<Path> stream = Files.list(dir)) {
for (Path path : stream.toList()) {
if (Files.isRegularFile(path)) {
// 是文件 → 加入列表
fileList.add(path.toAbsolutePath().toString());
} else if (Files.isDirectory(path)) {
// 是文件夹 → 递归继续找
listAllFilesRecursively(path, fileList);
}
}
}
}
}