修改:minio分片上传针对大文件,有条件异步通过webscoket通知
This commit is contained in:
@@ -17,4 +17,7 @@ public class ChunkUploadMinioFileResp {
|
|||||||
// 分片文件的临时目录,第一次请求后,每次都会返回
|
// 分片文件的临时目录,第一次请求后,每次都会返回
|
||||||
private String fileTempPath;
|
private String fileTempPath;
|
||||||
|
|
||||||
|
// 是否是异步返回,默认N 同步返回; 只有最后一片超级大文件合并超时的时候,走异步,字段返回Y,返回Y的时候,前端就要监听webscoket的消息数据,后端通过webscoket通知已经合并上传完成。
|
||||||
|
private String asyncBack="N";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ import com.github.pagehelper.util.StringUtil;
|
|||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.sdm.common.common.SdmResponse;
|
import com.sdm.common.common.SdmResponse;
|
||||||
import com.sdm.common.common.ThreadLocalContext;
|
import com.sdm.common.common.ThreadLocalContext;
|
||||||
|
import com.sdm.common.common.WsMessage;
|
||||||
|
import com.sdm.common.common.WsSceneEnum;
|
||||||
import com.sdm.common.entity.ExportExcelFormat;
|
import com.sdm.common.entity.ExportExcelFormat;
|
||||||
import com.sdm.common.entity.constants.NumberConstants;
|
import com.sdm.common.entity.constants.NumberConstants;
|
||||||
import com.sdm.common.entity.constants.PermConstants;
|
import com.sdm.common.entity.constants.PermConstants;
|
||||||
@@ -32,6 +34,7 @@ import com.sdm.common.feign.inter.project.ISimulationNodeFeignClient;
|
|||||||
import com.sdm.common.feign.inter.system.IApproveFeignClient;
|
import com.sdm.common.feign.inter.system.IApproveFeignClient;
|
||||||
import com.sdm.common.feign.inter.system.ISysConfigFeignClient;
|
import com.sdm.common.feign.inter.system.ISysConfigFeignClient;
|
||||||
import com.sdm.common.feign.inter.system.ISysLogFeignClient;
|
import com.sdm.common.feign.inter.system.ISysLogFeignClient;
|
||||||
|
import com.sdm.common.feign.inter.system.IWsPushToolFeignClient;
|
||||||
import com.sdm.common.feign.inter.task.ISimuluationTaskPoolFeignClient;
|
import com.sdm.common.feign.inter.task.ISimuluationTaskPoolFeignClient;
|
||||||
import com.sdm.common.log.CoreLogger;
|
import com.sdm.common.log.CoreLogger;
|
||||||
import com.sdm.common.log.constants.ModuleConstants;
|
import com.sdm.common.log.constants.ModuleConstants;
|
||||||
@@ -97,6 +100,8 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -208,6 +213,11 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
@Value("${tempNgUrl:http://192.168.65.161:10031/storage/}")
|
@Value("${tempNgUrl:http://192.168.65.161:10031/storage/}")
|
||||||
private String TEMP_NG_URL;
|
private String TEMP_NG_URL;
|
||||||
|
|
||||||
|
@Value("${minioMerge.wait:55}")
|
||||||
|
private Long minioMergerWait;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IWsPushToolFeignClient wsPushToolFeignClient;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -2064,43 +2074,90 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
finalLocalFilePath,localTempDirPath,req.getChunkTotal(),localMergeSuccess);
|
finalLocalFilePath,localTempDirPath,req.getChunkTotal(),localMergeSuccess);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 全部分片已经上传 => 自动合并
|
|
||||||
String finalFileName = req.getObjectKey();
|
String finalFileName = req.getObjectKey();
|
||||||
Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName);
|
long start = System.currentTimeMillis();
|
||||||
if(!merge){
|
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() ->asyncMerge(finalFileName,chunkBucket,tempDirPath,
|
||||||
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
req.getBusinessId(),ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),start));
|
||||||
log.error("合并分片失败:{}",tempDirPath);
|
try {
|
||||||
// 删除分片上传的处理中状态
|
Boolean b1 = future.get(minioMergerWait, TimeUnit.SECONDS);
|
||||||
commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId());
|
log.info("chunkUploadToMinio merge result:{}",b1);
|
||||||
return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req);
|
if(b1){
|
||||||
|
// 同步返回
|
||||||
|
resp.setAsyncBack("N");
|
||||||
|
return buildSuccessResponse(resp,req,tempDirPath);
|
||||||
|
}
|
||||||
|
} catch (TimeoutException e1) {
|
||||||
|
log.warn("chunkUploadToMinio merge timeOut:{} s",minioMergerWait);
|
||||||
|
// 异步返回
|
||||||
|
resp.setAsyncBack("Y");
|
||||||
|
return buildSuccessResponse(resp,req,"");
|
||||||
|
}catch (Exception e){
|
||||||
|
log.error("chunkUploadToMinio merge error:{}",e);
|
||||||
}
|
}
|
||||||
// 4. 合并完成后删除临时目录
|
// 走到这里肯定就是失败了
|
||||||
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req);
|
||||||
// 删除分片上传的处理中状态
|
|
||||||
commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId());
|
// // 3. 全部分片已经上传 => 自动合并
|
||||||
return buildSuccessResponse(resp,req,tempDirPath);
|
// String finalFileName = req.getObjectKey();
|
||||||
|
// Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName);
|
||||||
|
// if(!merge){
|
||||||
|
// deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
||||||
|
// log.error("合并分片失败:{}",tempDirPath);
|
||||||
|
// // 删除分片上传的处理中状态
|
||||||
|
// commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId());
|
||||||
|
// return buildFailedResponse(resp,req.getSourceFileName()+"合并分片失败",req);
|
||||||
|
// }
|
||||||
|
// // 4. 合并完成后删除临时目录
|
||||||
|
// deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
||||||
|
// // 删除分片上传的处理中状态
|
||||||
|
// commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+req.getBusinessId());
|
||||||
|
// return buildSuccessResponse(resp,req,tempDirPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 异步合并
|
// 异步合并
|
||||||
private void asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId){
|
private Boolean asyncMerge (String finalFileName,String chunkBucket,String tempDirPath,Long businessId,Long teandId,Long userId,long start){
|
||||||
|
Boolean result = true;
|
||||||
try {
|
try {
|
||||||
|
ThreadLocalContext.setTenantId(teandId);
|
||||||
|
ThreadLocalContext.setUserId(userId);
|
||||||
Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName);
|
Boolean merge = minioService.merge(chunkBucket, tempDirPath, chunkBucket, finalFileName);
|
||||||
if(!merge){
|
if(!merge){
|
||||||
// 合并失败,直接删除原始的db数据
|
// 合并失败,直接删除原始的db数据
|
||||||
List<Long> businessIds = new ArrayList<>();
|
List<Long> businessIds = new ArrayList<>();
|
||||||
businessIds.add(businessId);
|
businessIds.add(businessId);
|
||||||
handleFailFiles(businessIds);
|
handleFailFiles(businessIds);
|
||||||
// 删除分片临时数据
|
|
||||||
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
|
||||||
log.error("asyncMerge 合并分片失败:{}",tempDirPath);
|
log.error("asyncMerge 合并分片失败:{}",tempDirPath);
|
||||||
|
result = false;
|
||||||
}
|
}
|
||||||
// 4. 合并完成后删除临时目录
|
|
||||||
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
result = false;
|
||||||
log.error("asyncMerge excute error:{}",e.getMessage());
|
log.error("asyncMerge excute error:{}",e.getMessage());
|
||||||
|
}finally {
|
||||||
|
// 删除分片临时数据
|
||||||
|
deleteTempFileAfterFailed(tempDirPath,chunkBucket);
|
||||||
}
|
}
|
||||||
// 删除分片上传的处理中状态
|
// 删除分片上传的处理中状态
|
||||||
commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId);
|
commonStatusUtil.end(CommonStatusUtil.CHUNK_UPLOAD_FILE+businessId);
|
||||||
|
long end=System.currentTimeMillis();
|
||||||
|
// 计算线程执行耗时(秒)
|
||||||
|
long costTime = (end - start)/1000;
|
||||||
|
// 合并成功,且这个异步线程已经大于超时时间,则通过webscoket 通知前端
|
||||||
|
if (result && costTime >= minioMergerWait) {
|
||||||
|
ChunkUploadMinioFileResp resp = new ChunkUploadMinioFileResp();
|
||||||
|
resp.setBusinessId(businessId);
|
||||||
|
resp.setAsyncBack("Y");
|
||||||
|
resp.setResult(true);
|
||||||
|
resp.setFileTempPath(tempDirPath);
|
||||||
|
WsMessage<ChunkUploadMinioFileResp> message = new WsMessage<>();
|
||||||
|
message.setScene(WsSceneEnum.BIG_FILE_CHUNK.getScene());
|
||||||
|
message.setUserId(userId);
|
||||||
|
message.setTimestamp(System.currentTimeMillis());
|
||||||
|
message.setData(resp);
|
||||||
|
SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message);
|
||||||
|
log.info("asyncMerge push message:{}",JSONObject.toJSONString(sdmResponse));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -3171,7 +3228,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
fileMetadataInfo.setOriginalName(originalName);
|
fileMetadataInfo.setOriginalName(originalName);
|
||||||
fileMetadataInfo.setFileSize(req.getFile().getSize());
|
fileMetadataInfo.setFileSize(req.getFile().getSize());
|
||||||
fileMetadataInfo.setVersionNo(newVersionNo);
|
fileMetadataInfo.setVersionNo(newVersionNo);
|
||||||
|
|
||||||
// 更新文件存储统计信息
|
// 更新文件存储统计信息
|
||||||
fileStorageService.lambdaUpdate()
|
fileStorageService.lambdaUpdate()
|
||||||
.eq(FileStorage::getFileId, fileMetadataInfo.getId())
|
.eq(FileStorage::getFileId, fileMetadataInfo.getId())
|
||||||
@@ -3340,15 +3397,15 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
minioService.uploadFile(paramFile, simulationParamMinioObjectKey, null,simulationParamDirMetadataInfo.getBucketName());
|
minioService.uploadFile(paramFile, simulationParamMinioObjectKey, null,simulationParamDirMetadataInfo.getBucketName());
|
||||||
|
|
||||||
// 创建目录元数据并保存到数据库
|
// 创建目录元数据并保存到数据库
|
||||||
FileMetadataInfo fileInfo = createFileMetadata(simulationParamMinioObjectKey, originalFilename,
|
FileMetadataInfo fileInfo = createFileMetadata(simulationParamMinioObjectKey, originalFilename,
|
||||||
null, null, null, parSimulationParamDirId, paramFile.getSize(), null,null);
|
null, null, null, parSimulationParamDirId, paramFile.getSize(), null,null);
|
||||||
fileMetadataInfoService.save(fileInfo);
|
fileMetadataInfoService.save(fileInfo);
|
||||||
|
|
||||||
// 创建文件权限,默认赋予当前用户上传权限
|
// 创建文件权限,默认赋予当前用户上传权限
|
||||||
createFilePermission(fileInfo.getId());
|
createFilePermission(fileInfo.getId());
|
||||||
|
|
||||||
return SdmResponse.success(fileInfo.getId());
|
return SdmResponse.success(fileInfo.getId());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
minioService.deleteFile(simulationParamMinioObjectKey, simulationParamDirMetadataInfo.getBucketName());
|
minioService.deleteFile(simulationParamMinioObjectKey, simulationParamDirMetadataInfo.getBucketName());
|
||||||
|
|||||||
@@ -173,4 +173,8 @@ xxl:
|
|||||||
# xxljob 配置结束
|
# xxljob 配置结束
|
||||||
|
|
||||||
|
|
||||||
tempNgUrl: http://192.168.65.161:10031/storage/
|
tempNgUrl: http://192.168.65.161:10031/storage/
|
||||||
|
|
||||||
|
# minio 合并超时时间 s
|
||||||
|
minioMerge:
|
||||||
|
wait: 2
|
||||||
Reference in New Issue
Block a user