diff --git a/data/src/main/java/com/sdm/data/config/thread/NonSensitiveTaskThreadPool.java b/data/src/main/java/com/sdm/data/config/thread/NonSensitiveTaskThreadPool.java new file mode 100644 index 00000000..c08e0ad2 --- /dev/null +++ b/data/src/main/java/com/sdm/data/config/thread/NonSensitiveTaskThreadPool.java @@ -0,0 +1,54 @@ +package com.sdm.data.config.thread; + +import com.sdm.common.mdc.MdcTaskDecorator; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 自定义线程池配置--非敏感业务使用,能接受可能长时间排队 + */ +@Configuration +public class NonSensitiveTaskThreadPool { + + @Value("${nonSensitiveTaskPool.coreSize:2}") + private int CORE_POOL_SIZE ; + @Value("${nonSensitiveTaskPool.maxSize:4}") + private int MAX_POOL_SIZE ; + @Value("${nonSensitiveTaskPool.queueSize:5000}") + private int QUEUE_CAPACITY ; + @Value("${nonSensitiveTaskPool.keepLive:60}") + private int KEEP_ALIVE_SECONDS ; + @Value("${nonSensitiveTaskPool.threadName:nonSensitiveTaskPool-}") + private String THREAD_NAME_PREFIX ; + + @Bean(name = "nonSensitiveTaskPool") + public Executor nonSensitiveTaskPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(CORE_POOL_SIZE); + // 最大线程数 + executor.setMaxPoolSize(MAX_POOL_SIZE); + // 队列容量 + executor.setQueueCapacity(QUEUE_CAPACITY); + // 空闲线程存活时间 + executor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS); + // 线程名称前缀 + executor.setThreadNamePrefix(THREAD_NAME_PREFIX); + executor.setTaskDecorator(new MdcTaskDecorator()); + // 线程池拒绝策略:当任务数超过最大线程数+队列容量时的处理方式 + // - ThreadPoolExecutor.AbortPolicy(默认):直接抛出 RejectedExecutionException + // - ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程执行(减缓提交速度,适用于并发不高的场景) + // - ThreadPoolExecutor.DiscardPolicy:直接丢弃新任务,不抛出异常 + // - ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最旧的任务,再尝试提交新任务 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + + // 初始化线程池(必须调用,否则线程池不生效) + executor.initialize(); + return executor; + } +} \ No newline at end of file diff --git a/data/src/main/java/com/sdm/data/service/IMinioService.java b/data/src/main/java/com/sdm/data/service/IMinioService.java index b0d9cc4f..f36a21f1 100644 --- a/data/src/main/java/com/sdm/data/service/IMinioService.java +++ b/data/src/main/java/com/sdm/data/service/IMinioService.java @@ -42,7 +42,9 @@ public interface IMinioService { * @param directoryName 目录名称(objectKey) */ public void deleteDirectoryRecursively(String directoryName); - + + public void deleteDirectoryRecursively2(String directoryName,String bucketName); + /** * 递归删除指定目录下的所有对象。 * diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index 83d0ed79..b5b409a1 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -50,6 +50,7 @@ import org.assertj.core.util.DateUtil; import org.jetbrains.annotations.NotNull; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -65,6 +66,8 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -132,6 +135,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { @Autowired private ISimulationNodeFeignClient isSimulationNodeFeignClient; + @Autowired + @Qualifier(value = "nonSensitiveTaskPool") + private Executor nonSensitiveTaskPool; + @Override public String getType() { return type; @@ -215,10 +222,10 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { public SdmResponse chunkUploadToMinio(ChunkUploadMinioFileReq req) { ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp(); // 基础路径配置 -// Long tenantId = ThreadLocalContext.getTenantId(); - Long tenantId = 123456l; - Long userId = 9999l; -// Long userId= ThreadLocalContext.getUserId(); + Long tenantId = ThreadLocalContext.getTenantId(); +// Long tenantId = 123456l; +// Long userId = 9999l; + Long userId= ThreadLocalContext.getUserId(); // -2. 参数校验 try { validateReq(req,tenantId,userId); @@ -260,7 +267,12 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { if(!merge){ return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",filePath,chunkBucket); } - // 4. 合并完成后删除临时目录 todo + // 4. 合并完成后删除临时目录 + String finalTempDirPath=tempDirPath; + String finalChunkBucket=chunkBucket; + CompletableFuture.runAsync(() -> { + minioService.deleteDirectoryRecursively2(finalTempDirPath, finalChunkBucket); + }, nonSensitiveTaskPool); return buildSuccessResponse(resp,finalFileName,filePath,chunkBucket); } diff --git a/data/src/main/java/com/sdm/data/service/minio/MinioService.java b/data/src/main/java/com/sdm/data/service/minio/MinioService.java index 31cef64e..812bb4d2 100644 --- a/data/src/main/java/com/sdm/data/service/minio/MinioService.java +++ b/data/src/main/java/com/sdm/data/service/minio/MinioService.java @@ -175,6 +175,10 @@ public class MinioService implements IMinioService { deleteDirectoryRecursively(directoryName, minioConfig.getSpdmBucket()); } + public void deleteDirectoryRecursively2(String directoryName,String bucketName) { + deleteDirectoryRecursively(directoryName, bucketName); + } + /** * 从MinIO删除文件 * @param minioObjectKey 文件名