新增:minio分片上传,异步删除碎片文件
This commit is contained in:
@@ -0,0 +1,54 @@
|
|||||||
|
package com.sdm.data.config.thread;
|
||||||
|
|
||||||
|
import com.sdm.common.mdc.MdcTaskDecorator;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 自定义线程池配置--非敏感业务使用,能接受可能长时间排队
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class NonSensitiveTaskThreadPool {
|
||||||
|
|
||||||
|
@Value("${nonSensitiveTaskPool.coreSize:2}")
|
||||||
|
private int CORE_POOL_SIZE ;
|
||||||
|
@Value("${nonSensitiveTaskPool.maxSize:4}")
|
||||||
|
private int MAX_POOL_SIZE ;
|
||||||
|
@Value("${nonSensitiveTaskPool.queueSize:5000}")
|
||||||
|
private int QUEUE_CAPACITY ;
|
||||||
|
@Value("${nonSensitiveTaskPool.keepLive:60}")
|
||||||
|
private int KEEP_ALIVE_SECONDS ;
|
||||||
|
@Value("${nonSensitiveTaskPool.threadName:nonSensitiveTaskPool-}")
|
||||||
|
private String THREAD_NAME_PREFIX ;
|
||||||
|
|
||||||
|
@Bean(name = "nonSensitiveTaskPool")
|
||||||
|
public Executor nonSensitiveTaskPool() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
// 核心线程数
|
||||||
|
executor.setCorePoolSize(CORE_POOL_SIZE);
|
||||||
|
// 最大线程数
|
||||||
|
executor.setMaxPoolSize(MAX_POOL_SIZE);
|
||||||
|
// 队列容量
|
||||||
|
executor.setQueueCapacity(QUEUE_CAPACITY);
|
||||||
|
// 空闲线程存活时间
|
||||||
|
executor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
|
||||||
|
// 线程名称前缀
|
||||||
|
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
|
||||||
|
executor.setTaskDecorator(new MdcTaskDecorator());
|
||||||
|
// 线程池拒绝策略:当任务数超过最大线程数+队列容量时的处理方式
|
||||||
|
// - ThreadPoolExecutor.AbortPolicy(默认):直接抛出 RejectedExecutionException
|
||||||
|
// - ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程执行(减缓提交速度,适用于并发不高的场景)
|
||||||
|
// - ThreadPoolExecutor.DiscardPolicy:直接丢弃新任务,不抛出异常
|
||||||
|
// - ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最旧的任务,再尝试提交新任务
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
|
||||||
|
|
||||||
|
// 初始化线程池(必须调用,否则线程池不生效)
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -42,7 +42,9 @@ public interface IMinioService {
|
|||||||
* @param directoryName 目录名称(objectKey)
|
* @param directoryName 目录名称(objectKey)
|
||||||
*/
|
*/
|
||||||
public void deleteDirectoryRecursively(String directoryName);
|
public void deleteDirectoryRecursively(String directoryName);
|
||||||
|
|
||||||
|
public void deleteDirectoryRecursively2(String directoryName,String bucketName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 递归删除指定目录下的所有对象。
|
* 递归删除指定目录下的所有对象。
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ import org.assertj.core.util.DateUtil;
|
|||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@@ -65,6 +66,8 @@ import java.net.URLEncoder;
|
|||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
@@ -132,6 +135,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private ISimulationNodeFeignClient isSimulationNodeFeignClient;
|
private ISimulationNodeFeignClient isSimulationNodeFeignClient;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier(value = "nonSensitiveTaskPool")
|
||||||
|
private Executor nonSensitiveTaskPool;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getType() {
|
public String getType() {
|
||||||
return type;
|
return type;
|
||||||
@@ -215,10 +222,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
public SdmResponse<ChunkUploadMinioFileResp> chunkUploadToMinio(ChunkUploadMinioFileReq req) {
|
public SdmResponse<ChunkUploadMinioFileResp> chunkUploadToMinio(ChunkUploadMinioFileReq req) {
|
||||||
ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp();
|
ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp();
|
||||||
// 基础路径配置
|
// 基础路径配置
|
||||||
// Long tenantId = ThreadLocalContext.getTenantId();
|
Long tenantId = ThreadLocalContext.getTenantId();
|
||||||
Long tenantId = 123456l;
|
// Long tenantId = 123456l;
|
||||||
Long userId = 9999l;
|
// Long userId = 9999l;
|
||||||
// Long userId= ThreadLocalContext.getUserId();
|
Long userId= ThreadLocalContext.getUserId();
|
||||||
// -2. 参数校验
|
// -2. 参数校验
|
||||||
try {
|
try {
|
||||||
validateReq(req,tenantId,userId);
|
validateReq(req,tenantId,userId);
|
||||||
@@ -260,7 +267,12 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
if(!merge){
|
if(!merge){
|
||||||
return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",filePath,chunkBucket);
|
return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",filePath,chunkBucket);
|
||||||
}
|
}
|
||||||
// 4. 合并完成后删除临时目录 todo
|
// 4. 合并完成后删除临时目录
|
||||||
|
String finalTempDirPath=tempDirPath;
|
||||||
|
String finalChunkBucket=chunkBucket;
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
minioService.deleteDirectoryRecursively2(finalTempDirPath, finalChunkBucket);
|
||||||
|
}, nonSensitiveTaskPool);
|
||||||
return buildSuccessResponse(resp,finalFileName,filePath,chunkBucket);
|
return buildSuccessResponse(resp,finalFileName,filePath,chunkBucket);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -175,6 +175,10 @@ public class MinioService implements IMinioService {
|
|||||||
deleteDirectoryRecursively(directoryName, minioConfig.getSpdmBucket());
|
deleteDirectoryRecursively(directoryName, minioConfig.getSpdmBucket());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void deleteDirectoryRecursively2(String directoryName,String bucketName) {
|
||||||
|
deleteDirectoryRecursively(directoryName, bucketName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 从MinIO删除文件
|
* 从MinIO删除文件
|
||||||
* @param minioObjectKey 文件名
|
* @param minioObjectKey 文件名
|
||||||
|
|||||||
Reference in New Issue
Block a user