From 4d982ccf2c5a46cdb67abca929b7bab70b5c82ac Mon Sep 17 00:00:00 2001 From: yangyang Date: Tue, 7 Apr 2026 10:01:57 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Aminio=E5=88=86?= =?UTF-8?q?=E7=89=87=E4=B8=8A=E4=BC=A0=E9=92=88=E5=AF=B9=E5=A4=A7=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E6=9C=89=E6=9D=A1=E4=BB=B6=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E9=80=9A=E8=BF=87webscoket=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resp/data/ChunkUploadMinioFileResp.java | 3 + .../impl/MinioFileIDataFileServiceImpl.java | 101 ++++++++++++++---- .../src/main/resources/application-dev-65.yml | 6 +- 3 files changed, 87 insertions(+), 23 deletions(-) diff --git a/common/src/main/java/com/sdm/common/entity/resp/data/ChunkUploadMinioFileResp.java b/common/src/main/java/com/sdm/common/entity/resp/data/ChunkUploadMinioFileResp.java index c95b81c0..2ffc9cb0 100644 --- a/common/src/main/java/com/sdm/common/entity/resp/data/ChunkUploadMinioFileResp.java +++ b/common/src/main/java/com/sdm/common/entity/resp/data/ChunkUploadMinioFileResp.java @@ -17,4 +17,7 @@ public class ChunkUploadMinioFileResp { // 分片文件的临时目录,第一次请求后,每次都会返回 private String fileTempPath; + // 是否是异步返回,默认N 同步返回; 只有最后一片超级大文件合并超时的时候,走异步,字段返回Y,返回Y的时候,前端就要监听webscoket的消息数据,后端通过webscoket通知已经合并上传完成。 + private String asyncBack="N"; + } diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index 7c234189..19ee5511 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -12,6 +12,8 @@ import com.github.pagehelper.util.StringUtil; import com.google.common.collect.Sets; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.common.WsMessage; +import com.sdm.common.common.WsSceneEnum; import com.sdm.common.entity.ExportExcelFormat; import com.sdm.common.entity.constants.NumberConstants; import com.sdm.common.entity.constants.PermConstants; @@ -32,6 +34,7 @@ import com.sdm.common.feign.inter.project.ISimulationNodeFeignClient; import com.sdm.common.feign.inter.system.IApproveFeignClient; import com.sdm.common.feign.inter.system.ISysConfigFeignClient; import com.sdm.common.feign.inter.system.ISysLogFeignClient; +import com.sdm.common.feign.inter.system.IWsPushToolFeignClient; import com.sdm.common.feign.inter.task.ISimuluationTaskPoolFeignClient; import com.sdm.common.log.CoreLogger; import com.sdm.common.log.constants.ModuleConstants; @@ -97,6 +100,8 @@ import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -208,6 +213,11 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { @Value("${tempNgUrl:http://192.168.65.161:10031/storage/}") private String TEMP_NG_URL; + @Value("${minioMerge.wait:55}") + private Long minioMergerWait; + + @Autowired + private IWsPushToolFeignClient wsPushToolFeignClient; @Override @@ -2064,43 +2074,90 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { finalLocalFilePath,localTempDirPath,req.getChunkTotal(),localMergeSuccess); } - // 3. 全部分片已经上传 => 自动合并 String finalFileName = req.getObjectKey(); - Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); - if(!merge){ - deleteTempFileAfterFailed(tempDirPath,chunkBucket); - log.error("合并分片失败:{}",tempDirPath); - // 删除分片上传的处理中状态 - commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId()); - return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req); + long start = System.currentTimeMillis(); + CompletableFuture future = CompletableFuture.supplyAsync(() ->asyncMerge(finalFileName,chunkBucket,tempDirPath, + req.getBusinessId(),ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),start)); + try { + Boolean b1 = future.get(minioMergerWait, TimeUnit.SECONDS); + log.info("chunkUploadToMinio merge result:{}",b1); + if(b1){ + // 同步返回 + resp.setAsyncBack("N"); + return buildSuccessResponse(resp,req,tempDirPath); + } + } catch (TimeoutException e1) { + log.warn("chunkUploadToMinio merge timeOut:{} s",minioMergerWait); + // 异步返回 + resp.setAsyncBack("Y"); + return buildSuccessResponse(resp,req,""); + }catch (Exception e){ + log.error("chunkUploadToMinio merge error:{}",e); } - // 4. 合并完成后删除临时目录 - deleteTempFileAfterFailed(tempDirPath,chunkBucket); - // 删除分片上传的处理中状态 - commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId()); - return buildSuccessResponse(resp,req,tempDirPath); + // 走到这里肯定就是失败了 + return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req); + +// // 3. 全部分片已经上传 => 自动合并 +// String finalFileName = req.getObjectKey(); +// Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); +// if(!merge){ +// deleteTempFileAfterFailed(tempDirPath,chunkBucket); +// log.error("合并分片失败:{}",tempDirPath); +// // 删除分片上传的处理中状态 +// commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId()); +// return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req); +// } +// // 4. 合并完成后删除临时目录 +// deleteTempFileAfterFailed(tempDirPath,chunkBucket); +// // 删除分片上传的处理中状态 +// commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId()); +// return buildSuccessResponse(resp,req,tempDirPath); } // 异步合并 - private void asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId){ + private Boolean asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId,Long teandId,Long userId,long start){ + Boolean result = true; try { + ThreadLocalContext.setTenantId(teandId); + ThreadLocalContext.setUserId(userId); Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); if(!merge){ // 合并失败,直接删除原始的db数据 List businessIds = new ArrayList<>(); businessIds.add(businessId); handleFailFiles(businessIds); - // 删除分片临时数据 - deleteTempFileAfterFailed(tempDirPath,chunkBucket); log.error("asyncMerge 合并分片失败:{}",tempDirPath); + result = false; } - // 4. 合并完成后删除临时目录 - deleteTempFileAfterFailed(tempDirPath,chunkBucket); + } catch (Exception e) { + result = false; log.error("asyncMerge excute error:{}",e.getMessage()); + }finally { + // 删除分片临时数据 + deleteTempFileAfterFailed(tempDirPath,chunkBucket); } // 删除分片上传的处理中状态 commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId); + long end=System.currentTimeMillis(); + // 计算线程执行耗时(秒) + long costTime = (end - start)/1000; + // 合并成功,且这个异步线程已经大于超时时间,则通过webscoket 通知前端 + if (result && costTime >= minioMergerWait) { + ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp(); + resp.setBusinessId(businessId); + resp.setAsyncBack("Y"); + resp.setResult(true); + resp.setFileTempPath(tempDirPath); + WsMessage message = new WsMessage<>(); + message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene()); + message.setUserId(userId); + message.setTimestamp(System.currentTimeMillis()); + message.setData(resp); + SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message); + log.info("asyncMerge push message:{}",JSONObject.toJSONString(sdmResponse)); + } + return result; } @@ -3171,7 +3228,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { fileMetadataInfo.setOriginalName(originalName); fileMetadataInfo.setFileSize(req.getFile().getSize()); fileMetadataInfo.setVersionNo(newVersionNo); - + // 更新文件存储统计信息 fileStorageService.lambdaUpdate() .eq(FileStorage::getFileId, fileMetadataInfo.getId()) @@ -3340,15 +3397,15 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { try { minioService.uploadFile(paramFile, simulationParamMinioObjectKey, null,simulationParamDirMetadataInfo.getBucketName()); - + // 创建目录元数据并保存到数据库 FileMetadataInfo fileInfo = createFileMetadata(simulationParamMinioObjectKey, originalFilename, null, null, null, parSimulationParamDirId, paramFile.getSize(), null,null); fileMetadataInfoService.save(fileInfo); - + // 创建文件权限,默认赋予当前用户上传权限 createFilePermission(fileInfo.getId()); - + return SdmResponse.success(fileInfo.getId()); } catch (Exception e) { minioService.deleteFile(simulationParamMinioObjectKey, simulationParamDirMetadataInfo.getBucketName()); diff --git a/data/src/main/resources/application-dev-65.yml b/data/src/main/resources/application-dev-65.yml index 7b3283a2..7b13a208 100644 --- a/data/src/main/resources/application-dev-65.yml +++ b/data/src/main/resources/application-dev-65.yml @@ -173,4 +173,8 @@ xxl: # xxljob 配置结束 -tempNgUrl: http://192.168.65.161:10031/storage/ \ No newline at end of file +tempNgUrl: http://192.168.65.161:10031/storage/ + +# minio 合并超时时间 s +minioMerge: + wait: 2 \ No newline at end of file From 56a73deb7cab2e8fff79cf7b0530877e2eb09d81 Mon Sep 17 00:00:00 2001 From: yangyang Date: Tue, 7 Apr 2026 10:37:13 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Aminio=E5=88=86?= =?UTF-8?q?=E7=89=87=E4=B8=8A=E4=BC=A0=E9=92=88=E5=AF=B9=E5=A4=A7=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=A2=9E=E5=8A=A0=E7=9B=B8=E5=85=B3=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/MinioFileIDataFileServiceImpl.java | 89 ++++++++++--------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index 19ee5511..57cfb321 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -2076,8 +2076,9 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { String finalFileName = req.getObjectKey(); long start = System.currentTimeMillis(); + String traceId = MdcUtil.getTraceId(); CompletableFuture future = CompletableFuture.supplyAsync(() ->asyncMerge(finalFileName,chunkBucket,tempDirPath, - req.getBusinessId(),ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),start)); + req.getBusinessId(),ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),start,traceId)); try { Boolean b1 = future.get(minioMergerWait, TimeUnit.SECONDS); log.info("chunkUploadToMinio merge result:{}",b1); @@ -2115,49 +2116,57 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { } // 异步合并 - private Boolean asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId,Long teandId,Long userId,long start){ - Boolean result = true; + private Boolean asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId,Long teandId,Long userId,long start,String traceId){ try { - ThreadLocalContext.setTenantId(teandId); - ThreadLocalContext.setUserId(userId); - Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); - if(!merge){ - // 合并失败,直接删除原始的db数据 - List businessIds = new ArrayList<>(); - businessIds.add(businessId); - handleFailFiles(businessIds); - log.error("asyncMerge 合并分片失败:{}",tempDirPath); - result = false; - } + MdcUtil.putTraceId(traceId); + log.info("{} asyncMerge start merge,userId:{}",finalFileName,userId); + Boolean result = true; + try { + ThreadLocalContext.setTenantId(teandId); + ThreadLocalContext.setUserId(userId); + Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName); + if(!merge){ + // 合并失败,直接删除原始的db数据 + List businessIds = new ArrayList<>(); + businessIds.add(businessId); + handleFailFiles(businessIds); + log.error("asyncMerge 合并分片失败:{}",tempDirPath); + result = false; + } + } catch (Exception e) { + result = false; + log.error("asyncMerge excute error:{}",e.getMessage()); + }finally { + // 删除分片临时数据 + deleteTempFileAfterFailed(tempDirPath,chunkBucket); + } + // 删除分片上传的处理中状态 + commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId); + long end=System.currentTimeMillis(); + // 计算线程执行耗时(秒) + long costTime = (end - start)/1000; + // 合并成功,且这个异步线程已经大于超时时间,则通过webscoket 通知前端 + if (result && costTime >= minioMergerWait) { + ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp(); + resp.setBusinessId(businessId); + resp.setAsyncBack("Y"); + resp.setResult(true); + resp.setFileTempPath(tempDirPath); + WsMessage message = new WsMessage<>(); + message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene()); + message.setUserId(userId); + message.setTimestamp(System.currentTimeMillis()); + message.setData(resp); + SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message); + log.info("asyncMerge push message:{}",JSONObject.toJSONString(sdmResponse)); + } + return result; } catch (Exception e) { - result = false; - log.error("asyncMerge excute error:{}",e.getMessage()); - }finally { - // 删除分片临时数据 - deleteTempFileAfterFailed(tempDirPath,chunkBucket); + log.error("asyncMerge error:{}",e.getMessage()); + MdcUtil.removeTraceId(); + return false; } - // 删除分片上传的处理中状态 - commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId); - long end=System.currentTimeMillis(); - // 计算线程执行耗时(秒) - long costTime = (end - start)/1000; - // 合并成功,且这个异步线程已经大于超时时间,则通过webscoket 通知前端 - if (result && costTime >= minioMergerWait) { - ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp(); - resp.setBusinessId(businessId); - resp.setAsyncBack("Y"); - resp.setResult(true); - resp.setFileTempPath(tempDirPath); - WsMessage message = new WsMessage<>(); - message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene()); - message.setUserId(userId); - message.setTimestamp(System.currentTimeMillis()); - message.setData(resp); - SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message); - log.info("asyncMerge push message:{}",JSONObject.toJSONString(sdmResponse)); - } - return result; } From 169fb12a3f02793a05aa128c5c3a3384155b0715 Mon Sep 17 00:00:00 2001 From: gulongcheng <474084054@qq.com> Date: Tue, 7 Apr 2026 10:56:56 +0800 Subject: [PATCH 3/4] =?UTF-8?q?fix:=E6=A0=87=E5=87=86=E5=BA=93=E5=9C=BA?= =?UTF-8?q?=E6=99=AF=EF=BC=8C=E7=BB=91=E5=AE=9A=E5=BC=82=E5=B8=B8=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/service/impl/FileSimulationMappingServiceImpl.java | 1 + .../com/sdm/data/service/minio/MinioTenantInitializer.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/data/src/main/java/com/sdm/data/service/impl/FileSimulationMappingServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/FileSimulationMappingServiceImpl.java index 4ec3ea90..c66750d8 100644 --- a/data/src/main/java/com/sdm/data/service/impl/FileSimulationMappingServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/FileSimulationMappingServiceImpl.java @@ -148,6 +148,7 @@ public class FileSimulationMappingServiceImpl extends ServiceImpl>> simulationPoolTaskId2Files = list.stream() + .filter(fileSimu -> fileId2Filemeta.containsKey(fileSimu.getFileId())) .collect(Collectors.groupingBy( FileSimulationMapping::getSimulationPoolTaskId, Collectors.mapping( diff --git a/data/src/main/java/com/sdm/data/service/minio/MinioTenantInitializer.java b/data/src/main/java/com/sdm/data/service/minio/MinioTenantInitializer.java index 157b0ea6..60c5d3b4 100644 --- a/data/src/main/java/com/sdm/data/service/minio/MinioTenantInitializer.java +++ b/data/src/main/java/com/sdm/data/service/minio/MinioTenantInitializer.java @@ -34,8 +34,8 @@ public class MinioTenantInitializer implements CommandLineRunner { private IDataFileService dataFileService; // 配置重试策略 - private static final int MAX_RETRIES = 5; // 最多重试5次 - private static final int RETRY_DELAY_SECONDS = 5; // 每次间隔5秒,共覆盖60秒启动延迟 + private static final int MAX_RETRIES = 3; // 最多重试5次 + private static final int RETRY_DELAY_SECONDS = 2; // 每次间隔5秒,共覆盖60秒启动延迟 @Override public void run(String... args) { From 27acf6d741b24a16143c72b8f1078fa1f728c14b Mon Sep 17 00:00:00 2001 From: yangyang Date: Tue, 7 Apr 2026 10:59:13 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Aminio=E5=88=86?= =?UTF-8?q?=E7=89=87=E4=B8=8A=E4=BC=A0=E9=92=88=E5=AF=B9=E5=A4=A7=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=A2=9E=E5=8A=A0=E7=9B=B8=E5=85=B3=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sdm/data/service/impl/MinioFileIDataFileServiceImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index 57cfb321..04fe123a 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -2077,8 +2077,11 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { String finalFileName = req.getObjectKey(); long start = System.currentTimeMillis(); String traceId = MdcUtil.getTraceId(); + Long tenantId = ThreadLocalContext.getTenantId(); + Long userId = ThreadLocalContext.getUserId(); + log.info("async asyncMerge start tenantId:{},userId:{}",tenantId,userId); CompletableFuture future = CompletableFuture.supplyAsync(() ->asyncMerge(finalFileName,chunkBucket,tempDirPath, - req.getBusinessId(),ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),start,traceId)); + req.getBusinessId(),tenantId,userId,start,traceId)); try { Boolean b1 = future.get(minioMergerWait, TimeUnit.SECONDS); log.info("chunkUploadToMinio merge result:{}",b1);