批量导入项目节点

This commit is contained in:
2026-01-22 15:58:54 +08:00
parent 5399d2c67d
commit fe27aa1e39
13 changed files with 705 additions and 0 deletions

View File

@@ -3,10 +3,14 @@ package com.sdm.data.config;
import io.minio.MinioClient;
import io.minio.admin.MinioAdminClient;
import lombok.Data;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
@ConfigurationProperties(prefix = "minio")
@Data
@@ -41,9 +45,14 @@ public class MinioConfig {
@Bean
public MinioClient minioClient() {
// 自定义OkHttpClient设置连接池参数,复用连接池中的长连接,提高性能避免频繁创建、销毁
OkHttpClient httpClient = new OkHttpClient.Builder()
.connectionPool(new ConnectionPool(100, 5, TimeUnit.MINUTES)) // 允许100个空闲连接
.build();
return MinioClient.builder()
.endpoint(endpoint, port, secure)
.credentials(accessKey, secretKey)
.httpClient(httpClient)
.build();
}
}

View File

@@ -0,0 +1,37 @@
package com.sdm.data.config.thread;
import com.sdm.common.mdc.MdcTaskDecorator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 专门用于 MinIO 批量 IO 操作的线程池
*/
@Configuration
public class MinioTaskThreadPool {
@Bean(name = "minioBatchExecutor")
public Executor minioBatchExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数IO密集型建议 (CPU核数 * 2) 或更高。针对2500个请求建议 20-50
executor.setCorePoolSize(30);
// 最大线程数:根据 MinIO 服务器承受能力定
executor.setMaxPoolSize(100);
// 队列容量2500个任务设置5000足够
executor.setQueueCapacity(5000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("minio-batch-");
// 拒绝策略:关键!不能用 Discard否则目录会创建丢失。
// 使用 CallerRunsPolicy如果线程池满了让提交任务的线程主线程自己执行起到“限流”作用。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
}

View File

@@ -62,6 +62,18 @@ public class DataFileController implements IDataFeignClient {
return IDataFileService.createDir(req);
}
/**
* 批量创建文件夹(支持多级树形结构)
*
* @param req 批量创建请求
* @return 创建成功的文件夹ID列表
*/
@PostMapping("/batchCreateDir")
@Operation(summary = "批量创建文件夹", description = "支持多级树形结构的批量创建采用BFS分层批量插入策略")
public SdmResponse<List<Long>> batchCreateDir(@RequestBody @Validated BatchCreateDirReq req) {
return IDataFileService.batchCreateDir(req);
}
/**
* 因为审批的都是文件 所以这个审批回调方法可以公用
* @param req

View File

@@ -30,6 +30,16 @@ public interface IDataFileService {
*/
SdmResponse createDir(CreateDirReq req);
/**
* 批量创建目录(支持多级树形结构)
* 采用BFS分层批量插入策略每层只执行一次批量数据库操作
* @param req 批量创建目录请求参数
* @return 创建成功的目录ID列表
*/
default SdmResponse<List<Long>> batchCreateDir(BatchCreateDirReq req) {
return null;
}
/**
* 删除目录
* @param req 删除目录请求参数

View File

@@ -33,6 +33,20 @@ public interface IMinioService {
*/
void createDirectoryByObjectKey(String objectKey, String bucketName);
/**
* 批量创建目录
* @param objectKeys
* @param bucketName
*/
void batchCreateDirectories(List<String> objectKeys, String bucketName);
/**
* 批量删除文件/目录
* @param objectKeys 文件/目录的objectKey列表
* @param bucketName 桶名称
*/
void batchDeleteFiles(List<String> objectKeys, String bucketName);
/**
* 递归删除指定目录下的所有对象。
*

View File

@@ -49,6 +49,7 @@ import com.sdm.data.service.impl.dataFileHandle.ApproveStrategyFactory;
import io.minio.Result;
import io.minio.messages.Item;
import jakarta.servlet.http.HttpServletResponse;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -3283,4 +3284,381 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
return SdmResponse.success(fileMetadataInfoResp);
}
// ========================== 批量创建目录实现 ==========================
/**
* 批量创建目录上下文:用于在各私有方法间传递状态
* <p>
* 示例数据结构(创建项目节点目录树):
* <pre>
* projectNode/ ← 系统根目录 (id=1, objectKey="projectNode/")
* ├── 项目A/ ← 第1层-项目节点 (uuid="proj-001", parentUuId=null, uuIdOwnType="project")
* │ ├── 任务1/ ← 第2层-任务节点 (uuid="task-001", parentUuId="proj-001", uuIdOwnType="task")
* │ │ └── 运行1/ ← 第3层-运行节点 (uuid="run-001", parentUuId="task-001", uuIdOwnType="run")
* │ └── 任务2/ ← 第2层-任务节点 (uuid="task-002", parentUuId="proj-001", uuIdOwnType="task")
* └── 项目B/ ← 第1层-项目节点 (uuid="proj-002", parentUuId=null, uuIdOwnType="project")
* </pre>
*/
@Data
private static class BatchCreateDirContext {
/** 租户ID从线程上下文获取 */
Long tenantId;
/** 目录类型1-知识库, 2-项目节点, 3-头像库... */
Integer dirType;
/** 创建成功的目录ID列表最终返回给调用方 */
List<Long> createdDirIds = new ArrayList<>();
/** 已创建的MinIO目录key列表用于异常时补偿删除 */
List<String> createdMinioKeys = new ArrayList<>();
/**
* 父子关系映射parentUuId -> 子节点列表
* <pre>
* 示例:
* "proj-001" -> [task-001, task-002]
* "task-001" -> [run-001]
* </pre>
*/
Map<String, List<DirNodeInfo>> parentToChildren;
/**
* 所有要创建的节点UUID集合包括项目节点、任务节点、运行节点等
* <pre>
* 示例:{"proj-001", "proj-002", "task-001", "task-002", "run-001"}
* </pre>
*/
Set<String> allNodeUuIds;
/**
* 第一层父节点UUID集合这些节点的父目录是系统根目录
* <pre>
* 判断规则parentUuId为空
* </pre>
*/
Set<String> firstLevelParentUuIds;
/**
* UUID到数据库ID的映射随着分层插入逐步填充
* <pre>
* 初始化:"" -> 1 (系统根目录ID)
* 第1层插入后"proj-001" -> 100, "proj-002" -> 101
* 第2层插入后"task-001" -> 200, "task-002" -> 201
* 第3层插入后"run-001" -> 300
* </pre>
*/
Map<String, Long> uuidToDbIdMap = new HashMap<>();
/**
* UUID到MinIO ObjectKey的映射用于构建子目录路径
* <pre>
* 初始化:"" -> "projectNode/" (系统根目录objectKey)
* 第1层插入后"proj-001" -> "projectNode/项目A/"
* 第2层插入后"task-001" -> "projectNode/项目A/任务1/"
* 第3层插入后"run-001" -> "projectNode/项目A/任务1/运行1/"
* </pre>
*/
Map<String, String> uuidToObjectKeyMap = new HashMap<>();
}
/**
* 批量创建目录(支持多级树形结构)
* 采用BFS分层批量插入策略每层只执行一次批量数据库操作
*/
@Override
@Transactional(rollbackFor = Exception.class)
public SdmResponse<List<Long>> batchCreateDir(BatchCreateDirReq req) {
// 1. 参数校验
SdmResponse<Void> validateResult = validateBatchCreateDirRequest(req);
if (!validateResult.isSuccess()) {
return SdmResponse.failed(validateResult.getMessage());
}
BatchCreateDirContext ctx = new BatchCreateDirContext();
ctx.setTenantId(ThreadLocalContext.getTenantId());
ctx.setDirType(req.getDirType());
try {
// 2. 构建节点分组数据
buildNodeGroupingData(req, ctx);
// 3. 检查重复创建
SdmResponse<Void> duplicateCheck = checkDuplicateNodes(ctx);
if (!duplicateCheck.isSuccess()) {
return SdmResponse.failed(duplicateCheck.getMessage());
}
// 5. 初始化UUID映射
// 获取系统根目录
SdmResponse<FileMetadataInfo> rootDirResult = getSystemRootDir(ctx.dirType);
if (!rootDirResult.isSuccess()) {
return SdmResponse.failed(rootDirResult.getMessage());
}
FileMetadataInfo systemRootDir = rootDirResult.getData();
initUuidMappings(ctx, systemRootDir);
// 6. BFS层级遍历按层分组
List<List<DirNodeInfo>> levelNodes = buildLevelNodesByBFS(ctx.getParentToChildren(), ctx.getFirstLevelParentUuIds());
log.info("批量创建目录,共{}层,总节点数:{}", levelNodes.size(),
levelNodes.stream().mapToInt(List::size).sum());
// 7. 分层批量插入
for (int levelIndex = 0; levelIndex < levelNodes.size(); levelIndex++) {
processLevelNodes(levelNodes.get(levelIndex), levelIndex, ctx);
}
log.info("批量创建目录完成,共创建{}个目录", ctx.createdDirIds.size());
return SdmResponse.success(ctx.createdDirIds);
} catch (Exception e) {
log.error("批量创建目录失败", e);
compensateDeleteMinioKeys(ctx.createdMinioKeys);
throw new RuntimeException("批量创建目录失败: " + e.getMessage(), e);
}
}
/**
* 校验批量创建目录请求参数
*/
private SdmResponse<Void> validateBatchCreateDirRequest(BatchCreateDirReq req) {
if (req == null || CollectionUtils.isEmpty(req.getItems())) {
log.error("批量创建目录失败,请求参数为空");
return SdmResponse.failed("请求参数不能为空");
}
if (req.getDirType() == null) {
log.error("批量创建目录失败,文件夹类型为空");
return SdmResponse.failed("文件夹类型不能为空");
}
return SdmResponse.success(null);
}
/**
* 构建节点分组数据展开所有节点包括父节点和子节点并按parentUuId分组
*/
private void buildNodeGroupingData(BatchCreateDirReq req, BatchCreateDirContext ctx) {
// 展开所有节点:包括父节点(项目节点)和子节点
List<DirNodeInfo> allNodes = new ArrayList<>();
Set<String> allNodeUuIds =new HashSet<>();
List<BatchCreateDirItem> batchCreateDirItems = req.getItems();
for (BatchCreateDirItem item : batchCreateDirItems) {
// 添加父节点(项目节点也需要创建)
// 如果父节点已经添加过了,则跳过
if(allNodeUuIds.contains(item.getParentDirNodeInfo().getUuId())){
continue;
}
allNodeUuIds.add(item.getParentDirNodeInfo().getUuId());
allNodes.add(item.getParentDirNodeInfo());
// 添加子节点
if (CollectionUtils.isNotEmpty(item.getChildDirNodeInfos())) {
item.getChildDirNodeInfos().stream()
.filter(child -> !allNodeUuIds.contains(child.getUuId()))
.forEach(child -> {
allNodeUuIds.add(child.getUuId());
allNodes.add(child);
});
}
}
// 收集所有节点UUID
ctx.setAllNodeUuIds(allNodeUuIds);
// 按parentUuId分组构建父子关系图
Map<String, List<DirNodeInfo>> parentToChildren = batchCreateDirItems.stream().collect(Collectors.toMap(
// key父节点UUID
item -> item.getParentDirNodeInfo().getUuId(),
// value当前item的子节点列表
BatchCreateDirItem::getChildDirNodeInfos
));
ctx.setParentToChildren(parentToChildren);
// 找出第一层父节点UUID集合这些节点的父目录是系统根目录
ctx.setFirstLevelParentUuIds(allNodes.stream()
.filter(element -> element.getParentUuId() == null)
.map(DirNodeInfo::getUuId)
.collect(Collectors.toSet()));
}
/**
* 检查要创建的节点是否已存在(重复导入检测)
*/
private SdmResponse<Void> checkDuplicateNodes(BatchCreateDirContext ctx) {
List<FileMetadataInfo> existingNodes = fileMetadataInfoService.lambdaQuery()
.in(FileMetadataInfo::getRelatedResourceUuid, ctx.allNodeUuIds)
.eq(FileMetadataInfo::getTenantId, ctx.tenantId)
.list();
if (CollectionUtils.isNotEmpty(existingNodes)) {
log.error("批量创建目录失败部分目录已存在非新项目已存在的uuid: {}",
existingNodes.stream().map(FileMetadataInfo::getRelatedResourceUuid).collect(Collectors.toList()));
return SdmResponse.failed("目录已存在,请勿重复创建");
}
return SdmResponse.success(null);
}
/**
* 根据dirType获取系统根目录
*/
private SdmResponse<FileMetadataInfo> getSystemRootDir(Integer dirType) {
DirTypeEnum dirTypeEnum = DirTypeEnum.getDirTypeByValue(dirType);
if (dirTypeEnum == null) {
log.error("批量创建目录失败,无效的目录类型: {}", dirType);
return SdmResponse.failed("无效的目录类型");
}
String rootDirObjectKey = getDirMinioObjectKey(dirTypeEnum.getDirName());
Optional<FileMetadataInfo> rootDirOpt = getFileMetadataInfoByObjectKey(rootDirObjectKey, null);
if (!rootDirOpt.isPresent()) {
log.error("批量创建目录失败,系统根目录不存在");
return SdmResponse.failed("系统根目录不存在,请先初始化系统目录");
}
return SdmResponse.success(rootDirOpt.get());
}
/**
* 初始化UUID到数据库ID/ObjectKey的映射
*/
private void initUuidMappings(BatchCreateDirContext ctx, FileMetadataInfo systemRootDir) {
// 空字符串表示第一层节点的父目录(系统根目录)
ctx.uuidToDbIdMap.put("", systemRootDir.getId());
ctx.uuidToObjectKeyMap.put("", systemRootDir.getObjectKey());
}
/**
* 处理单层节点的创建创建MinIO目录、数据库记录、权限记录
*/
private void processLevelNodes(List<DirNodeInfo> nodes, int levelIndex, BatchCreateDirContext ctx) {
log.info("开始处理第{}层,节点数:{}", levelIndex + 1, nodes.size());
// 1. 构建该层所有节点的元数据对象并收集需要创建的MinIO目录路径
List<FileMetadataInfo> entities = new ArrayList<>();
List<String> minioObjectKeys = new ArrayList<>();
for (DirNodeInfo node : nodes) {
FileMetadataInfo entity = buildDirNodeMetadata(node, ctx);
entities.add(entity);
minioObjectKeys.add(entity.getObjectKey());
}
// 2. 批量创建MinIO目录
minioService.batchCreateDirectories(minioObjectKeys, null);
ctx.getCreatedMinioKeys().addAll(minioObjectKeys);
// 3. 批量插入数据库
fileMetadataInfoService.saveBatch(entities, 500);
// 4. 插入后更新映射MyBatis-Plus会回填ID
for (FileMetadataInfo entity : entities) {
ctx.getUuidToDbIdMap().put(entity.getRelatedResourceUuid(), entity.getId());
ctx.getCreatedDirIds().add(entity.getId());
}
// 5. 批量创建权限记录
createBatchPermissions(entities, ctx.tenantId);
log.info("第{}层处理完成,创建了{}个目录", levelIndex + 1, entities.size());
}
/**
* 构建单个目录节点的元数据不创建MinIO目录
*/
private FileMetadataInfo buildDirNodeMetadata(DirNodeInfo node, BatchCreateDirContext ctx) {
String parentUuId = node.getParentUuId() == null ? "" : node.getParentUuId();
Long parentDbId = ctx.getUuidToDbIdMap().get(parentUuId);
String parentObjectKey = ctx.getUuidToObjectKeyMap().get(parentUuId);
if (parentDbId == null || parentObjectKey == null) {
log.error("批量创建目录失败父目录信息不存在parentUuId={}", node.getParentUuId());
throw new RuntimeException("父目录信息不存在: " + node.getParentUuId());
}
// 构造子目录完整路径
String childDirMinioObjectKey = getDirMinioObjectKey(parentObjectKey + node.getDirName());
// 创建目录元数据
FileMetadataInfo dirInfo = createDirectoryMetadata(
childDirMinioObjectKey, node.getDirName(), false, parentDbId,
node.getUuId(), node.getUuIdOwnType(), ctx.dirType);
// 预先更新ObjectKey映射供下一层使用
ctx.getUuidToObjectKeyMap().put(node.getUuId(), childDirMinioObjectKey);
return dirInfo;
}
/**
* 批量创建权限记录
*/
private void createBatchPermissions(List<FileMetadataInfo> entities, Long tenantId) {
List<FileUserPermission> permissions = entities.stream()
.map(entity -> {
FileUserPermission permission = new FileUserPermission();
permission.setTFilemetaId(entity.getId());
permission.setPermission(FilePermissionEnum.ALL.getValue());
permission.setUserId(ThreadLocalContext.getUserId());
permission.setTenantId(tenantId);
return permission;
})
.collect(Collectors.toList());
fileUserPermissionService.saveBatch(permissions, 500);
}
/**
* 补偿删除已创建的MinIO目录
*/
private void compensateDeleteMinioKeys(List<String> createdMinioKeys) {
if (createdMinioKeys == null || createdMinioKeys.isEmpty()) {
return;
}
try {
minioService.batchDeleteFiles(createdMinioKeys, null);
} catch (Exception e) {
log.warn("补偿删除MinIO目录失败数量{}", createdMinioKeys.size(), e);
}
}
/**
* BFS层级遍历按层分组节点
*
* @param parentToChildren 父子关系映射
* @param firstLevelParentUuIds 第一层父节点UUID集合挂在系统根目录下
* @return 按层分组的节点列表
*/
private List<List<DirNodeInfo>> buildLevelNodesByBFS(
Map<String, List<DirNodeInfo>> parentToChildren,
Set<String> firstLevelParentUuIds) {
List<List<DirNodeInfo>> levelNodes = new ArrayList<>();
Set<String> currentLevelParents = new HashSet<>(firstLevelParentUuIds);
while (!currentLevelParents.isEmpty()) {
List<DirNodeInfo> currentLevelChildren = new ArrayList<>();
Set<String> nextLevelParents = new HashSet<>();
for (String parentUuId : currentLevelParents) {
List<DirNodeInfo> children = parentToChildren.get(parentUuId);
if (children != null) {
for (DirNodeInfo child : children) {
currentLevelChildren.add(child);
// 如果这个子节点还有子节点,则加入下一层的父节点集合
if (parentToChildren.containsKey(child.getUuId())) {
nextLevelParents.add(child.getUuId());
}
}
}
}
if (!currentLevelChildren.isEmpty()) {
levelNodes.add(currentLevelChildren);
}
currentLevelParents = nextLevelParents;
}
return levelNodes;
}
}

View File

@@ -15,6 +15,7 @@ import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
@@ -34,7 +35,9 @@ import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -44,6 +47,11 @@ public class MinioService implements IMinioService {
private final MinioClient minioClient;
private final MinioConfig minioConfig;
@Autowired
@Qualifier("minioBatchExecutor")
private Executor minioBatchExecutor;
// 可配置缓冲区大小) 16KB
private static final int DEFAULT_BUFFER_SIZE = 16384;
@@ -203,6 +211,52 @@ public class MinioService implements IMinioService {
}
}
@Override
public void batchCreateDirectories(List<String> objectKeys, String bucketName) {
List<CompletableFuture<Void>> futures = objectKeys.stream()
.map(key -> CompletableFuture.runAsync(() -> {
createDirectoryByObjectKey(key, bucketName);
}, minioBatchExecutor))
.toList();
// 等待所有任务完成
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(2, TimeUnit.MINUTES); // 设置总超时时间
} catch (Exception e) {
log.error("批量创建目录部分失败或超时", e);
}
}
@Override
public void batchDeleteFiles(List<String> objectKeys, String bucketName) {
if (objectKeys == null || objectKeys.isEmpty()) {
return;
}
try {
// 构建删除对象列表
List<DeleteObject> deleteObjects = objectKeys.stream()
.map(key -> new DeleteObject(dealDirPath(key)))
.toList();
// 批量删除
Iterable<Result<DeleteError>> deleteErrors = minioClient.removeObjects(
RemoveObjectsArgs.builder()
.bucket(getBucketName(bucketName))
.objects(deleteObjects)
.build());
// 处理删除错误(如果有)
for (Result<DeleteError> deleteErrorResult : deleteErrors) {
DeleteError deleteError = deleteErrorResult.get();
log.warn("批量删除对象失败: {}, 错误: {}", deleteError.objectName(), deleteError.message());
}
} catch (Exception e) {
log.error("批量删除文件失败", e);
throw new RuntimeException("批量删除文件失败", e);
}
}
private static String dealDirPath(String dirPath) {
if (dirPath.startsWith("/")) {