diff --git a/common/src/main/java/com/sdm/common/common/WsSceneEnum.java b/common/src/main/java/com/sdm/common/common/WsSceneEnum.java index 2abd885f..ef9a60d9 100644 --- a/common/src/main/java/com/sdm/common/common/WsSceneEnum.java +++ b/common/src/main/java/com/sdm/common/common/WsSceneEnum.java @@ -9,7 +9,8 @@ public enum WsSceneEnum { WS_HEART_BEAT("WS_HEART_BEAT","websocket心跳检测" ), BIG_FILE_CHUNK("BIG_FILE_CHUNK","大文件分片上传结果通知" ), - FLOW_NODE_STATUS_CHANGED("FLOW_NODE_STATUS_CHANGED","流程节点状态变化通知" ); + FLOW_NODE_STATUS_CHANGED("FLOW_NODE_STATUS_CHANGED","流程节点状态变化通知" ), + BATCH_DOWNLOAD_READY("BATCH_DOWNLOAD_READY","批量打包下载就绪通知" ); /** * scene webscoket 推送的业务场景 diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/DelHpcJobsReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/DelHpcJobsReq.java index 06c4307e..7e61e33a 100644 --- a/common/src/main/java/com/sdm/common/entity/req/pbs/DelHpcJobsReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/DelHpcJobsReq.java @@ -6,5 +6,6 @@ import java.util.List; @Data public class DelHpcJobsReq { + private List ids; private List hpcJobIds; } diff --git a/common/src/main/java/com/sdm/common/entity/resp/data/BatchDownloadNoticeResp.java b/common/src/main/java/com/sdm/common/entity/resp/data/BatchDownloadNoticeResp.java new file mode 100644 index 00000000..60bfd6c0 --- /dev/null +++ b/common/src/main/java/com/sdm/common/entity/resp/data/BatchDownloadNoticeResp.java @@ -0,0 +1,77 @@ +package com.sdm.common.entity.resp.data; + +import lombok.Data; + +/** + * 批量打包下载完成的 WebSocket 通知数据体 + * 与 WsSceneEnum.BATCH_DOWNLOAD_READY 场景配合使用 + */ +@Data +public class BatchDownloadNoticeResp { + + /** + * 任务ID,与发起下载时返回的 taskId 一致,前端用于匹配通知 + * example: "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + */ + private String taskId; + + /** + * 任务状态:SUCCESS 成功,FAILED 失败 + */ + private String status; + + /** + * ZIP 文件的 MinIO 预签名下载 URL,status=SUCCESS 时有值,1小时内有效 + * example: "http://192.168.65.161:9000/proj-bucket/temp/zip/2026-04-16/taskId.zip?..." + */ + private String zipDownloadUrl; + + /** + * 最终压缩包名称, example: "项目资料.zip" + */ + private String zipFileName; + + /** + * 实际成功打包的文件数量, example: 5 + */ + private Integer fileCount; + + /** + * 失败原因,status=FAILED 时有值 + */ + private String errorMsg; + + /** + * 构建成功通知 + * + * @param taskId 任务ID + * @param zipDownloadUrl 预签名URL + * @param zipFileName 压缩包名 + * @param fileCount 打包文件数 + */ + public static BatchDownloadNoticeResp success(String taskId, String zipDownloadUrl, + String zipFileName, int fileCount) { + BatchDownloadNoticeResp resp = new BatchDownloadNoticeResp(); + resp.setTaskId(taskId); + resp.setStatus("SUCCESS"); + resp.setZipDownloadUrl(zipDownloadUrl); + resp.setZipFileName(zipFileName); + resp.setFileCount(fileCount); + return resp; + } + + /** + * 构建失败通知 + * + * @param taskId 任务ID + * @param errorMsg 失败原因 + */ + public static BatchDownloadNoticeResp failed(String taskId, String errorMsg) { + BatchDownloadNoticeResp resp = new BatchDownloadNoticeResp(); + resp.setTaskId(taskId); + resp.setStatus("FAILED"); + resp.setErrorMsg(errorMsg); + return resp; + } + +} diff --git a/common/src/main/java/com/sdm/common/entity/resp/data/SimulationTaskResultCurveResp.java b/common/src/main/java/com/sdm/common/entity/resp/data/SimulationTaskResultCurveResp.java index 0e82c173..fa3e6b74 100644 --- a/common/src/main/java/com/sdm/common/entity/resp/data/SimulationTaskResultCurveResp.java +++ b/common/src/main/java/com/sdm/common/entity/resp/data/SimulationTaskResultCurveResp.java @@ -41,4 +41,8 @@ public class SimulationTaskResultCurveResp extends BaseResp { @Schema(description = "创建人") private Long creatorId; + + @Schema(description = "更新时间") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime updateTime; } diff --git a/common/src/main/java/com/sdm/common/utils/FilesUtil.java b/common/src/main/java/com/sdm/common/utils/FilesUtil.java index e12b6ee3..37e33a88 100644 --- a/common/src/main/java/com/sdm/common/utils/FilesUtil.java +++ b/common/src/main/java/com/sdm/common/utils/FilesUtil.java @@ -1,5 +1,6 @@ package com.sdm.common.utils; +import com.sdm.common.config.FlowableConfig; import com.sdm.common.entity.resp.data.LocalFileNodeVO; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -817,6 +818,33 @@ public class FilesUtil { return Pair.of(mainFileAbsolutePath, slaveFileList); } + /** + * 静态方法:创建 本地算例 目录 + * @param objectKey 目录名/子路径,调用者必须传递文件夹的路径 + * @return 成功返回完整路径字符串,失败返回 "" + */ + public static String createRunLocalDir(String objectKey) { + // 入参非法直接返回空 + if (objectKey == null || objectKey.isBlank()) { + log.warn("createRunLocalDir objectKey is blank"); + return ""; + } + try { + // 拼接路径: /home/simulation + objectKey + Path preDir = Paths.get(FlowableConfig.FLOWABLE_SIMULATION_BASEDIR).toAbsolutePath().normalize(); + Path targetDir = preDir.resolve(objectKey); + // 关键:创建所有目录(已存在也不会报错) + Files.createDirectories(targetDir); + // 成功:返回完整路径 + return targetDir.toString(); + } catch (Exception e) { + log.error("createRunLocalDir objectKey error:{} ",e.getMessage()); + // 权限不足、磁盘满、路径非法 → 都返回空 + return ""; + } + } + + /** * 递归工具方法:遍历文件夹下所有层级的文件 * @param dir 要遍历的目录 diff --git a/data/src/main/java/com/sdm/data/controller/DataFileController.java b/data/src/main/java/com/sdm/data/controller/DataFileController.java index e3236060..ad689ca6 100644 --- a/data/src/main/java/com/sdm/data/controller/DataFileController.java +++ b/data/src/main/java/com/sdm/data/controller/DataFileController.java @@ -13,8 +13,10 @@ import com.sdm.common.log.annotation.SysLog; import com.sdm.common.utils.FilesUtil; import com.sdm.data.model.entity.FileMetadataInfo; import com.sdm.data.model.req.*; +import com.sdm.data.model.resp.BatchDownloadTaskResp; import com.sdm.data.model.resp.KKFileViewURLFromMinioResp; import com.sdm.data.model.resp.MinioDownloadUrlResp; +import com.sdm.data.model.resp.UpdateFileInfoResp; import com.sdm.data.service.IDataFileService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -384,6 +386,26 @@ public class DataFileController implements IDataFeignClient { return IDataFileService.updateFile(req); } + /** + * 大文件分片更新第一步:预处理文件元数据,生成新版本 objectKey,返回分片上传参数。 + * 第二步复用 /chunkUploadToMinio 接口完成分片上传。 + */ + @AutoFillDictTags + @PostMapping("/prepareUpdateFileInfo") + @Operation(summary = "大文件分片更新-文件信息准备", description = "预处理更新文件元数据,返回分片上传所需的 objectKey 等参数") + public SdmResponse prepareUpdateFileInfo(@RequestBody @Validated UpdateFileInfoReq req) { + return IDataFileService.prepareUpdateFileInfo(req); + } + + /** + * 大文件分片更新第三步:分片上传完成后的回调,执行版本更新或失败回滚。 + */ + @PostMapping("/updateFileCallback") + @Operation(summary = "大文件分片更新-上传完成回调", description = "分片上传完成后前端调用,成功则完成版本更新,失败则回滚清理") + public SdmResponse updateFileCallback(@RequestBody @Validated UpdateFileCallbackReq req) { + return IDataFileService.updateFileCallback(req); + } + /** * 压缩文件 * @@ -551,6 +573,17 @@ public class DataFileController implements IDataFeignClient { return IDataFileService.getDownloadUrlWithPermission(fileId, skipPermissionCheck); } + /** + * 批量打包压缩下载(异步) + * 接口立即返回 taskId,后台打包完成后通过 WebSocket 推送预签名下载 URL + * 前端监听 scene=BATCH_DOWNLOAD_READY 的 WebSocket 消息,收到后触发下载 + */ + @PostMapping("/batchDownloadZip") + @Operation(summary = "批量打包下载(异步)", description = "提交打包任务立即返回taskId,完成后WebSocket推送可下载的预签名URL(1小时有效)") + public SdmResponse batchDownloadZip(@RequestBody @Validated BatchDownloadReq req) { + return IDataFileService.batchDownloadZip(req); + } + @GetMapping("/getPublicDownloadUrl") @Operation(summary = "获取MinIO文件下载的预签名URL(无权限校验)", description = "获取MinIO文件的预签名URL(无权限校验)") public SdmResponse getPublicDownloadUrl(@Parameter(description = "文件id") @RequestParam("fileId") Long fileId) { @@ -564,7 +597,17 @@ public class DataFileController implements IDataFeignClient { } /** - * 分片上传文件到minio + * 大文件上传第一步:文件信息入库准备发起评审 + */ + @AutoFillDictTags + @PostMapping("/batchAddFileInfo") + @Operation(summary = "文件信息入库准备发起评审") + public SdmResponse> batchAddFileInfo(@RequestBody UploadFilesReq req) { + return IDataFileService.batchAddFileInfo(req); + } + + /** + * 大文件上传第二步: 分片上传文件到minio * * @param req * @return SdmResponse @@ -575,14 +618,11 @@ public class DataFileController implements IDataFeignClient { return IDataFileService.chunkUploadToMinio(req); } - /** - * 文件信息入库准备发起评审 - */ - @AutoFillDictTags - @PostMapping("/batchAddFileInfo") - @Operation(summary = "文件信息入库准备发起评审") - public SdmResponse> batchAddFileInfo(@RequestBody UploadFilesReq req) { - return IDataFileService.batchAddFileInfo(req); + // 大文件上传第三步: 文件分片上传成功后前端回调 + @PostMapping("/chunkUploadCallback") + @Operation(summary = "文件分片上传成功后前端回调") + public SdmResponse chunkUploadCallback(@RequestBody KnowledgeCallBackReq req) { + return IDataFileService.chunkUploadCallback(req); } @PostMapping("/getChunkUploadMergeResult") @@ -591,13 +631,6 @@ public class DataFileController implements IDataFeignClient { return IDataFileService.getChunkUploadMergeResult(req); } - @PostMapping("/chunkUploadCallback") - @Operation(summary = "文件分片上传成功后前端回调") - public SdmResponse chunkUploadCallback(@RequestBody KnowledgeCallBackReq req) { - return IDataFileService.chunkUploadCallback(req); - } - - /** * 下载文件到本地临时目录 * diff --git a/data/src/main/java/com/sdm/data/model/req/BatchDownloadReq.java b/data/src/main/java/com/sdm/data/model/req/BatchDownloadReq.java new file mode 100644 index 00000000..c52ee180 --- /dev/null +++ b/data/src/main/java/com/sdm/data/model/req/BatchDownloadReq.java @@ -0,0 +1,35 @@ +package com.sdm.data.model.req; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import lombok.Data; + +import java.util.List; + +/** + * 批量打包下载请求参数 + */ +@Data +@Schema(description = "批量打包下载请求参数") +public class BatchDownloadReq { + + /** + * 需要打包的文件ID列表, example: [129203, 129204, 129205] + */ + @NotEmpty(message = "文件ID列表不能为空") + @Schema(description = "文件ID列表", requiredMode = Schema.RequiredMode.REQUIRED) + private List fileIds; + + /** + * 压缩包名称, example: "项目资料.zip";为空时自动生成 batch_{时间戳}.zip + */ + @Schema(description = "压缩包名称,为空则自动生成") + private String zipFileName; + + /** + * 是否跳过权限校验,默认 false(仅内部调用时传 true) + */ + @Schema(description = "是否跳过权限校验,默认false") + private Boolean skipPermissionCheck = false; + +} diff --git a/data/src/main/java/com/sdm/data/model/req/UpdateFileCallbackReq.java b/data/src/main/java/com/sdm/data/model/req/UpdateFileCallbackReq.java new file mode 100644 index 00000000..3e65bc57 --- /dev/null +++ b/data/src/main/java/com/sdm/data/model/req/UpdateFileCallbackReq.java @@ -0,0 +1,33 @@ +package com.sdm.data.model.req; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import jakarta.validation.constraints.NotNull; + +import java.util.List; + +/** + * 大文件分片更新 - 第三步回调请求参数。 + * 前端在所有分片上传完成(或失败)后调用,通知后端执行最终的版本更新或回滚。 + */ +@Data +@Schema(description = "大文件分片更新-上传完成回调请求参数") +public class UpdateFileCallbackReq { + + @NotNull(message = "文件id不能为空") + @Schema(description = "待更新的文件id(与第一步传入的 fileId 一致),example: 123456") + private Long fileId; + + @Schema(description = "分片上传是否全部成功,true=成功 false=失败") + private Boolean success; + + @Schema(description = "上传任务id(第一步返回),example: 1713250000000") + private String uploadTaskId; + + // 接口5.1 返回的这个文件成功文件的id集合 + private List succBusinessIds; + + // 接口5.1 返回的这个文件失败文件的id集合 + private List failBusinessIds; +} diff --git a/data/src/main/java/com/sdm/data/model/req/UpdateFileInfoReq.java b/data/src/main/java/com/sdm/data/model/req/UpdateFileInfoReq.java new file mode 100644 index 00000000..3731f799 --- /dev/null +++ b/data/src/main/java/com/sdm/data/model/req/UpdateFileInfoReq.java @@ -0,0 +1,78 @@ +package com.sdm.data.model.req; + +import com.alibaba.fastjson2.annotation.JSONField; +import com.sdm.common.entity.req.data.FileMetadataExtensionRequest; +import com.sdm.common.entity.req.data.SimulationPoolInfo; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import jakarta.validation.constraints.NotNull; +import java.util.List; +import java.util.Map; + +/** + * 大文件分片更新 - 第一步请求参数。 + * 仅携带文件元信息(不含 MultipartFile),用于在分片上传前完成元数据预处理。 + */ +@Data +@Schema(description = "大文件分片更新-文件信息准备请求参数") +public class UpdateFileInfoReq { + @Schema(description = "本次新增数据的任务id,毫秒值时间戳即可") + private String uploadTaskId; + + @NotNull(message = "文件id不能为空") + @Schema(description = "待更新的文件id") + private Long fileId; + + @NotNull(message = "文件名不能为空") + @Schema(description = "文件名(含后缀),example: report.docx") + private String fileName; + + @NotNull(message = "文件大小不能为空") + @Schema(description = "文件大小(字节),example: 104857600") + private Long fileSize; + + @Schema(description = "文件类型") + private Integer fileType; + + @Schema(description = "关联项目id") + private String projectId; + + @Schema(description = "关联工况库信息(JSON字符串)") + private String simulationPoolInfoListStr; + + @Schema(description = "关联工况库信息") + private List simulationPoolInfoList; + + @Schema(description = "关联分析项目id") + private String analysisDirectionId; + + @Schema(description = "备注信息") + private String remarks; + + @Schema(description = "字典标签查询结果缓存", hidden = true) + @JSONField(serialize = false) + private Map> dictTagIdsCache; + + @Schema(description = "字典标签查询列表,格式:['fileTypeDictClass','fileTypeDictValue','disciplineTypeDictClass','disciplineDictValue']") + private List dictTags; + + @Schema(description = "文件类型字典类") + private String fileTypeDictClass; + @Schema(description = "文件类型字典值") + private String fileTypeDictValue; + + @Schema(description = "学科类型字典类") + private String disciplineTypeDictClass; + @Schema(description = "学科类型字典值") + private String disciplineDictValue; + + @Schema(description = "知识库文件审批模板id") + private String templateId; + + @Schema(description = "知识库文件审批模板名称") + private String templateName; + + @Schema(description = "扩展信息") + private List fileMetadataExtensionRequest; +} diff --git a/data/src/main/java/com/sdm/data/model/resp/BatchDownloadTaskResp.java b/data/src/main/java/com/sdm/data/model/resp/BatchDownloadTaskResp.java new file mode 100644 index 00000000..4a8414bf --- /dev/null +++ b/data/src/main/java/com/sdm/data/model/resp/BatchDownloadTaskResp.java @@ -0,0 +1,24 @@ +package com.sdm.data.model.resp; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 批量打包下载任务响应(接口立即返回,前端凭 taskId 匹配 WebSocket 通知) + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "批量打包下载任务响应") +public class BatchDownloadTaskResp { + + /** + * 任务唯一标识,用于前端 loading 状态标识及 WebSocket 消息匹配 + * example: "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + */ + @Schema(description = "任务ID,与WebSocket通知中的taskId对应") + private String taskId; + +} diff --git a/data/src/main/java/com/sdm/data/model/resp/UpdateFileInfoResp.java b/data/src/main/java/com/sdm/data/model/resp/UpdateFileInfoResp.java new file mode 100644 index 00000000..b815db94 --- /dev/null +++ b/data/src/main/java/com/sdm/data/model/resp/UpdateFileInfoResp.java @@ -0,0 +1,28 @@ +package com.sdm.data.model.resp; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * 大文件分片更新 - 第一步响应参数。 + * 返回分片上传所需的关键标识,前端据此调用 chunkUploadToMinio。 + */ +@Data +@Schema(description = "大文件分片更新-文件信息准备响应参数") +public class UpdateFileInfoResp { + + @Schema(description = "文件记录id(即 businessId),example: 123456") + private Long businessId; + + @Schema(description = "MinIO 文件 objectKey(新版本路径),example: tenant/dir/file_V2.docx") + private String objectKey; + + @Schema(description = "本次上传任务id(毫秒时间戳),example: 1713250000000") + private String uploadTaskId; + + @Schema(description = "原始文件名,example: report.docx") + private String sourceFileName; + + @Schema(description = "MinIO bucket 名称") + private String bucketName; +} diff --git a/data/src/main/java/com/sdm/data/service/IDataFileService.java b/data/src/main/java/com/sdm/data/service/IDataFileService.java index 2cf4b327..7e98bdb3 100644 --- a/data/src/main/java/com/sdm/data/service/IDataFileService.java +++ b/data/src/main/java/com/sdm/data/service/IDataFileService.java @@ -8,8 +8,10 @@ import com.sdm.common.entity.resp.PageDataResp; import com.sdm.common.entity.resp.data.*; import com.sdm.data.model.entity.FileMetadataInfo; import com.sdm.data.model.req.*; +import com.sdm.data.model.resp.BatchDownloadTaskResp; import com.sdm.data.model.resp.KKFileViewURLFromMinioResp; import com.sdm.data.model.resp.MinioDownloadUrlResp; +import com.sdm.data.model.resp.UpdateFileInfoResp; import jakarta.servlet.http.HttpServletResponse; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; @@ -124,6 +126,24 @@ public interface IDataFileService { return null; } + /** + * 大文件分片更新第一步:预处理文件元数据,生成新版本 objectKey,返回分片上传所需参数。 + * @param req 更新文件信息请求(不含文件流) + * @return 包含 businessId、objectKey、uploadTaskId 等的响应 + */ + default SdmResponse prepareUpdateFileInfo(UpdateFileInfoReq req) { + return null; + } + + /** + * 大文件分片更新第三步:分片上传完成后的回调,执行版本更新或失败回滚。 + * @param req 回调请求,包含 fileId、uploadTaskId、是否成功标识 + * @return 操作结果 + */ + default SdmResponse updateFileCallback(UpdateFileCallbackReq req) { + return null; + } + /** * 删除文件 * @param req 删除文件请求参数 @@ -422,6 +442,15 @@ public interface IDataFileService { */ SdmResponse zipFiles(ZipFilesReq req); + /** + * 批量打包压缩下载(异步) + * 提交异步任务后立即返回 taskId,打包完成后通过 WebSocket 推送预签名下载 URL + * + * @param req 批量下载请求参数(fileIds、zipFileName、skipPermissionCheck) + * @return 任务ID响应,前端凭 taskId 匹配 WebSocket 通知 + */ + SdmResponse batchDownloadZip(BatchDownloadReq req); + /** * 知识库审批结果回调 diff --git a/data/src/main/java/com/sdm/data/service/IMinioService.java b/data/src/main/java/com/sdm/data/service/IMinioService.java index 93331a0d..0f9de834 100644 --- a/data/src/main/java/com/sdm/data/service/IMinioService.java +++ b/data/src/main/java/com/sdm/data/service/IMinioService.java @@ -78,7 +78,21 @@ public interface IMinioService { * @param bucketName 桶名称 */ void uploadFile(MultipartFile file, String objectName, Map tags, String bucketName); - + + /** + * 将 InputStream 直接上传到 MinIO,适用于批量打包 ZIP 等动态生成内容 + * tags 可用于触发 MinIO 桶生命周期规则自动过期删除,例如传入 {"auto-expire":"1d"} 会在1天后自动清理 + * + * @param inputStream 数据来源流 + * @param size 流的字节总数 + * @param objectName MinIO 对象路径, example: "temp/zip/2026-04-16/taskId.zip" + * @param contentType MIME 类型, example: "application/zip" + * @param bucketName 存储桶名称,为空时使用当前租户桶 + * @param tags 对象标签,可为 null,example: {"auto-expire":"1d"} + */ + void putObjectFromStream(InputStream inputStream, long size, String objectName, + String contentType, String bucketName, Map tags); + /** * 重命名MinIO中的文件 * diff --git a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java index f7f26a98..ec5b04fd 100644 --- a/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java +++ b/data/src/main/java/com/sdm/data/service/impl/MinioFileIDataFileServiceImpl.java @@ -50,8 +50,10 @@ import com.sdm.data.model.dto.ExportKnowledgeDto; import com.sdm.data.model.entity.*; import com.sdm.data.model.enums.ApproveFileActionENUM; import com.sdm.data.model.req.*; +import com.sdm.data.model.resp.BatchDownloadTaskResp; import com.sdm.data.model.resp.KKFileViewURLFromMinioResp; import com.sdm.data.model.resp.MinioDownloadUrlResp; +import com.sdm.data.model.resp.UpdateFileInfoResp; import com.sdm.data.service.*; import com.sdm.data.service.approve.FileApproveExecutor; import com.sdm.data.service.approve.FileApproveRequestBuilder; @@ -1851,6 +1853,158 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { return null; } + /** + * 批量打包压缩下载(异步) + * 立即返回 taskId,后台线程完成打包后通过 WebSocket 推送预签名下载 URL + * + * @param req 请求参数(fileIds、zipFileName、skipPermissionCheck) + * @return 任务ID,前端凭此匹配 WebSocket 通知 + */ + @Override + public SdmResponse batchDownloadZip(BatchDownloadReq req) { + String taskId = UUID.randomUUID().toString(); + Long userId = ThreadLocalContext.getUserId(); + Long tenantId = ThreadLocalContext.getTenantId(); + // 与分片合并异步回调同一模式:提交任务后立即返回,不阻塞请求线程 + SdmThreadService.builder().execute(() -> doBatchZipAndNotify(taskId, userId,tenantId, req)); + return SdmResponse.success(new BatchDownloadTaskResp(taskId)); + } + + /** + * 异步核心:从 MinIO 拉取文件 → 本地临时 ZIP → 上传 MinIO → WebSocket 通知前端 + * 任何异常都会推送 FAILED 通知,临时文件在 finally 中保证删除 + * + * @param taskId 任务唯一标识,用于前端 WebSocket 消息匹配 + * @param userId 发起操作的用户ID,用于 WebSocket 定向推送 + * @param req 原始请求参数 + */ + private void doBatchZipAndNotify(String taskId, Long userId,Long tenantId, BatchDownloadReq req) { + ThreadLocalContext.setUserId(userId); + ThreadLocalContext.setTenantId(tenantId); + // 压缩包名称:优先使用前端传入,否则自动生成 + String zipFileName = org.apache.commons.lang3.StringUtils.isNotBlank(req.getZipFileName()) + ? req.getZipFileName() + : "batch_" + System.currentTimeMillis() + ".zip"; + + File tempZipFile = null; + try { + tempZipFile = File.createTempFile("sdm_batch_zip_", ".zip"); + + // 第一阶段:逐个从 MinIO 拉取文件流,写入本地临时 ZIP + int packedCount = buildLocalZipFile(req, tempZipFile); + + // 第二阶段:将本地临时 ZIP 上传到 MinIO 临时目录,以 taskId 命名便于追溯 + // 打上 auto-expire=1d 标签,触发桶生命周期规则 auto-expire-1d,1天后 MinIO 自动删除该临时文件 + String zipObjectKey = "temp/zip/" + java.time.LocalDate.now() + "/" + taskId + ".zip"; + Map autoExpireTags = Collections.singletonMap("auto-expire", "1d"); + try (java.io.FileInputStream fis = new java.io.FileInputStream(tempZipFile)) { + minioService.putObjectFromStream(fis, tempZipFile.length(), zipObjectKey, "application/zip", null, autoExpireTags); + } + + // 第三阶段:生成预签名下载 URL(有效期 1 小时),推送 WebSocket 通知 + String presignedUrl = minioService.getMinioDownloadPresignedUrl(zipObjectKey, null, zipFileName); + BatchDownloadNoticeResp noticeData = BatchDownloadNoticeResp.success(taskId, presignedUrl, zipFileName, packedCount); + pushBatchDownloadNotice(userId, noticeData); + + } catch (Exception e) { + log.error("批量打包下载失败, taskId:{}, userId:{}", taskId, userId, e); + pushBatchDownloadNotice(userId, BatchDownloadNoticeResp.failed(taskId, e.getMessage())); + } finally { + if (tempZipFile != null && tempZipFile.exists()) { + tempZipFile.delete(); + } + } + } + + /** + * 将请求中的文件逐一从 MinIO 拉取并写入 ZipOutputStream + * 同名文件自动追加序号,如 report.pdf → report(1).pdf + * + * @param req 请求参数 + * @param tempZipFile 本地临时 ZIP 文件 + * @return 实际成功打包的文件数量 + */ + private int buildLocalZipFile(BatchDownloadReq req, File tempZipFile) throws Exception { + boolean skipPermission = Boolean.TRUE.equals(req.getSkipPermissionCheck()); + Set usedEntryNames = new HashSet<>(); + int packedCount = 0; + + try (java.util.zip.ZipOutputStream zos = new java.util.zip.ZipOutputStream(new java.io.FileOutputStream(tempZipFile))) { + for (Long fileId : req.getFileIds()) { + FileMetadataInfo meta = fileMetadataInfoService.getById(fileId); + if (meta == null) { + log.warn("batchDownloadZip: fileId={} 对应的文件元数据不存在,跳过", fileId); + continue; + } + // 无权限时跳过该文件,不中断整个任务 + if (!skipPermission) { + boolean hasPermission = fileUserPermissionService.hasFilePermission( + fileId, ThreadLocalContext.getUserId(), FilePermissionEnum.DOWNLOAD); + if (!hasPermission) { + log.warn("batchDownloadZip: userId={} 对 fileId={} 无下载权限,跳过", ThreadLocalContext.getUserId(), fileId); + continue; + } + } + String entryName = resolveUniqueEntryName(usedEntryNames, meta.getOriginalName()); + try (InputStream is = minioService.getMinioInputStream(meta.getObjectKey(), meta.getBucketName())) { + if (is == null) { + log.warn("batchDownloadZip: fileId={} MinIO 流为空,跳过", fileId); + continue; + } + zos.putNextEntry(new java.util.zip.ZipEntry(entryName)); + org.apache.commons.io.IOUtils.copy(is, zos); + zos.closeEntry(); + packedCount++; + } + } + } + return packedCount; + } + + /** + * 处理同名文件冲突:已存在同名时追加序号后缀 + * example: "report.pdf" 已存在 → 返回 "report(1).pdf" + * + * @param usedNames 当前 ZIP 中已使用的条目名集合(会被修改) + * @param fileName 原始文件名 + * @return 去重后的 ZIP 条目名 + */ + private String resolveUniqueEntryName(Set usedNames, String fileName) { + if (usedNames.add(fileName)) { + return fileName; + } + // 分离文件名和扩展名,追加序号直到不重复 + int dotIndex = fileName.lastIndexOf('.'); + String baseName = dotIndex > 0 ? fileName.substring(0, dotIndex) : fileName; + String extension = dotIndex > 0 ? fileName.substring(dotIndex) : ""; + int counter = 1; + String candidate; + do { + candidate = baseName + "(" + counter++ + ")" + extension; + } while (!usedNames.add(candidate)); + return candidate; + } + + /** + * 通过 Feign 调用 system 模块的 WebSocket 服务,向指定用户推送批量下载就绪通知 + * + * @param userId 接收通知的用户ID + * @param noticeData 通知数据体(含下载URL或错误信息) + */ + private void pushBatchDownloadNotice(Long userId, BatchDownloadNoticeResp noticeData) { + try { + WsMessage message = new WsMessage<>(); + message.setScene(WsSceneEnum.BATCH_DOWNLOAD_READY.getScene()); + message.setUserId(userId); + message.setTimestamp(System.currentTimeMillis()); + message.setData(noticeData); + SdmResponse sdmResponse = wsPushToolFeignClient.wsPushOne(message); + log.info("pushBatchDownloadNotice userId:{}, taskId:{}, response:{}", userId, noticeData.getTaskId(), JSONObject.toJSONString(sdmResponse)); + } catch (Exception e) { + log.error("pushBatchDownloadNotice 推送失败, userId:{}, taskId:{}", userId, noticeData.getTaskId(), e); + } + } + @Override public SdmResponse approveDataFile(LaunchApproveReq req) { @@ -3290,8 +3444,317 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService { fileMetadataInfoService.updateById(fileMetadataInfo); } + // ======================== 大文件分片更新(三步流程) ======================== - private List buildDictTagsFromEnum(UpdateFileReq req) { + /** + * 大文件分片更新第一步:预处理文件元数据,生成新版本 objectKey。 + *

+ * 不实际上传文件,仅完成: + * 1) 校验文件存在性与权限; + * 2) 计算新版本号与 objectKey; + * 3) 将待更新的元数据暂存到 tempMetadata 字段(供第三步回调时使用); + * 4) 返回前端所需的分片上传参数(businessId、objectKey、uploadTaskId)。 + *

+ */ + @Override + @Transactional(rollbackFor = Exception.class) + @PermissionCheckAspect.FilePermissionCheck(value = FilePermissionEnum.UPLOAD, fileIdExpression = "#req.fileId") + public SdmResponse prepareUpdateFileInfo(UpdateFileInfoReq req) { + // 1. 查询并校验文件 + FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery() + .eq(FileMetadataInfo::getId, req.getFileId()) + .one(); + if (fileMetadataInfo == null) { + return SdmResponse.failed("文件不存在"); + } + + FileMetadataInfo dirMetadataInfo = fileMetadataInfoService.lambdaQuery() + .eq(FileMetadataInfo::getId, fileMetadataInfo.getParentId()) + .eq(FileMetadataInfo::getDataType, DataTypeEnum.DIRECTORY.getValue()) + .one(); + + // 2. 解析工况绑定入参 + if (StringUtils.hasText(req.getSimulationPoolInfoListStr())) { + try { + req.setSimulationPoolInfoList(JSON.parseArray(req.getSimulationPoolInfoListStr(), SimulationPoolInfo.class)); + } catch (Exception e) { + return SdmResponse.failed("工况参数格式错误"); + } + } + + // 3. 解析标签入参 + if (CollectionUtils.isEmpty(req.getDictTags())) { + fileDictTagQueryService.fillFileTagsForRespList(List.of(req), UpdateFileInfoReq::getFileId); + List dictTags = buildDictTagsFromEnum(req); + if (!dictTags.isEmpty()) { + req.setDictTags(dictTags); + } + } + + if (CollectionUtils.isNotEmpty(req.getDictTags())) { + Map> dictIdMap = req.getDictTagIdsCache(); + if (dictIdMap == null || dictIdMap.isEmpty()) { + dictIdMap = dictTagHelper.queryAndCacheDictTagIds(req); + } + req.setDictTagIdsCache(dictIdMap); + } + + // 3. 生成新版本文件名与 objectKey + String originalName = req.getFileName(); + Long newVersionNo = fileMetadataInfo.getVersionNo() + 1; + String versionSuffix = "_V" + newVersionNo; + int dotIndex = originalName.lastIndexOf('.'); + if (dotIndex == -1) { + return SdmResponse.failed("文件没有后缀"); + } + String modifiedFileName = originalName.substring(0, dotIndex) + versionSuffix + originalName.substring(dotIndex); + + String parDirObjectKey = dirMetadataInfo != null ? dirMetadataInfo.getObjectKey() : ""; + String newObjectKey = getFileMinioObjectKey(parDirObjectKey + modifiedFileName); + + // 4. 构建并暂存待更新的元数据快照(tempMetadata),供第三步回调使用 + String uploadTaskId = req.getUploadTaskId(); + FileMetadataInfo tempInfo = new FileMetadataInfo(); + BeanUtils.copyProperties(fileMetadataInfo, tempInfo); + tempInfo.setObjectKey(newObjectKey); + tempInfo.setOriginalName(originalName); + tempInfo.setFileSize(req.getFileSize()); + tempInfo.setVersionNo(newVersionNo); + tempInfo.setProjectId(req.getProjectId()); + tempInfo.setAnalysisDirectionId(req.getAnalysisDirectionId()); + tempInfo.setRemarks(req.getRemarks()); + tempInfo.setSimulationPoolInfoList(req.getSimulationPoolInfoList()); + tempInfo.setUpdaterId(ThreadLocalContext.getUserId()); + tempInfo.setUpdateTime(LocalDateTime.now()); + tempInfo.setTemplateId(req.getTemplateId()); + tempInfo.setTemplateName(req.getTemplateName()); + tempInfo.setDictTagIdsCache(req.getDictTagIdsCache()); + // 暂存扩展信息和标签缓存到 tempInfo,回调时取出使用 + if (CollectionUtils.isNotEmpty(req.getFileMetadataExtensionRequest())) { + tempInfo.setFileMetadataExtensionRequest(req.getFileMetadataExtensionRequest()); + } + tempInfo.setUploadTaskId(uploadTaskId); + + // 5. 将 tempMetadata + uploadTaskId 写入主记录,标记正在更新中 + fileMetadataInfo.setTempMetadata(JSONObject.toJSONString(tempInfo)); + fileMetadataInfo.setUpdateTime(LocalDateTime.now()); + fileMetadataInfo.setUpdaterId(ThreadLocalContext.getUserId()); + fileMetadataInfoService.updateById(fileMetadataInfo); + + // 6. 构建响应 + UpdateFileInfoResp resp = new UpdateFileInfoResp(); + resp.setBusinessId(fileMetadataInfo.getId()); + resp.setObjectKey(newObjectKey); + resp.setUploadTaskId(uploadTaskId); + resp.setSourceFileName(originalName); + resp.setBucketName(fileMetadataInfo.getBucketName()); + return SdmResponse.success(resp); + } + + /** + * 大文件分片更新第三步:分片上传完成后的回调。 + *

+ * 成功场景: + * 1) 备份当前主记录为历史版本; + * 2) 从 tempMetadata 恢复新版本信息,更新主记录; + * 3) 若目录需要审批,则发起审批流程;否则直接生效。 + *

+ * 失败场景: + * 删除已上传到 MinIO 的新版本文件,清空 tempMetadata。 + *

+ */ + @Override + @Transactional(rollbackFor = Exception.class) + public SdmResponse updateFileCallback(UpdateFileCallbackReq req) { + if(CollectionUtils.isNotEmpty(req.getFailBusinessIds())){ + req.setFileId(req.getFailBusinessIds().get(0)); + req.setSuccess(false); + }else if(CollectionUtils.isNotEmpty(req.getSuccBusinessIds())){ + req.setFileId(req.getFailBusinessIds().get(0)); + req.setSuccess(true); + }else { + return SdmResponse.failed("请指定待更新的文件"); + } + + + // 1. 查询文件与暂存的 tempMetadata + FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery() + .eq(FileMetadataInfo::getId, req.getFileId()) + .one(); + if (fileMetadataInfo == null) { + return SdmResponse.failed("文件不存在"); + } + if (!StringUtils.hasText(fileMetadataInfo.getTempMetadata())) { + return SdmResponse.failed("未找到待更新的临时元数据,请先调用 prepareUpdateFileInfo"); + } + + FileMetadataInfo tempInfo = JSON.parseObject(fileMetadataInfo.getTempMetadata(), FileMetadataInfo.class); + String upstoredTaskId = tempInfo.getUploadTaskId(); + if (!req.getUploadTaskId().equals(upstoredTaskId)) { + return SdmResponse.failed("uploadTaskId 不匹配,可能存在并发更新"); + } + + String newObjectKey = tempInfo.getObjectKey(); + + // 2. 上传失败:回滚清理 + if (!Boolean.TRUE.equals(req.getSuccess())) { + if (StringUtils.hasText(newObjectKey)) { + try { + minioService.deleteFile(newObjectKey, fileMetadataInfo.getBucketName()); + } catch (Exception e) { + log.error("分片更新失败回滚-删除MinIO文件异常, objectKey={}", newObjectKey, e); + } + } + fileMetadataInfo.setTempMetadata(null); + fileMetadataInfoService.updateById(fileMetadataInfo); + return SdmResponse.success("已回滚清理"); + } + + // 3. 上传成功:查询父目录,判断是否需要审批 + FileMetadataInfo dirMetadataInfo = fileMetadataInfoService.lambdaQuery() + .eq(FileMetadataInfo::getId, fileMetadataInfo.getParentId()) + .eq(FileMetadataInfo::getDataType, DataTypeEnum.DIRECTORY.getValue()) + .one(); + + boolean approvalRequired = dirMetadataInfo != null && DirTypeEnum.isApprovalRequired(dirMetadataInfo.getDirType()); + + if (approvalRequired) { + // 3a. 需要审批:保留 tempMetadata,发起审批流程(审批通过后由策略类完成版本切换) + return launchChunkUpdateApproval(fileMetadataInfo, tempInfo, dirMetadataInfo); + } + + // 3b. 不需要审批:直接完成版本切换 + applyChunkUpdateDirectly(fileMetadataInfo, tempInfo, dirMetadataInfo); + return SdmResponse.success("更新成功"); + } + + /** + * 分片更新-需要审批:构建审批请求并发起。 + * 复用文件修改审批的 MODIFY 动作类型,审批通过后由 ModifyFileApproveStrategy 完成版本切换。 + */ + private SdmResponse launchChunkUpdateApproval(FileMetadataInfo fileMetadataInfo, FileMetadataInfo tempInfo, FileMetadataInfo dirMetadataInfo) { + FileApproveRequestBuilder builder = FileApproveRequestBuilder.builder() + .toUpdateFileIds(List.of(fileMetadataInfo.getId())) + .contents(DirTypeEnum.buildApprovalTitle(dirMetadataInfo.getDirType(), "文件上传新版本")) + .approveType(ApproveTypeEnum.KNOWLEDGE_APPROVE) + .approveFileActionENUM(ApproveFileActionENUM.MODIFY) + .beforeData(List.of(fileMetadataInfo)) + .afterData(List.of(tempInfo)) + .templateId(tempInfo.getTemplateId()) + .templateName(tempInfo.getTemplateName()) + .knowledgeBaseName(extractRelativePath(dirMetadataInfo)) + .build(); + + if (CollectionUtils.isNotEmpty(builder.getBeforeData())) { + setCreatorNames(builder.getBeforeData()); + setProjectName(builder.getBeforeData()); + setAnalysisDirectionName(builder.getBeforeData()); + setSimulationPoolAndTaskInfo(builder.getBeforeData()); + } + if (CollectionUtils.isNotEmpty(builder.getAfterData())) { + setCreatorNames(builder.getAfterData()); + setProjectName(builder.getAfterData()); + setAnalysisDirectionName(builder.getAfterData()); + setSimulationPoolAndTaskInfo(builder.getAfterData()); + } + + fileApproveExecutor.launchApproveAndUpdateStatus(builder, ApproveFileDataTypeEnum.MODIFY_REVIEWING); + return SdmResponse.success("已提交审批"); + } + + /** + * 分片更新-无需审批:直接完成版本切换。 + * 备份当前主记录为历史版本,用 tempInfo 中的新版本信息更新主记录。 + */ + private void applyChunkUpdateDirectly(FileMetadataInfo fileMetadataInfo, FileMetadataInfo tempInfo, FileMetadataInfo dirMetadataInfo) { + // 1. 备份当前主记录为历史版本 + FileMetadataInfo historyFileInfo = new FileMetadataInfo(); + BeanUtils.copyProperties(fileMetadataInfo, historyFileInfo); + historyFileInfo.setId(null); + historyFileInfo.setTempMetadata(null); + historyFileInfo.setIsLatest(false); + fileMetadataInfoService.save(historyFileInfo); + fileModifyApproveHelper.copyFilePermissions(fileMetadataInfo.getId(), historyFileInfo.getId(), fileUserPermissionService); + + // 2. 用新版本信息更新主记录 + fileMetadataInfo.setObjectKey(tempInfo.getObjectKey()); + fileMetadataInfo.setOriginalName(tempInfo.getOriginalName()); + fileMetadataInfo.setFileSize(tempInfo.getFileSize()); + fileMetadataInfo.setVersionNo(tempInfo.getVersionNo()); + fileMetadataInfo.setProjectId(tempInfo.getProjectId()); + fileMetadataInfo.setAnalysisDirectionId(tempInfo.getAnalysisDirectionId()); + fileMetadataInfo.setRemarks(tempInfo.getRemarks()); + fileMetadataInfo.setUpdateTime(LocalDateTime.now()); + fileMetadataInfo.setUpdaterId(ThreadLocalContext.getUserId()); + fileMetadataInfo.setIsLatest(true); + fileMetadataInfo.setTempMetadata(null); + + // 3. 更新文件存储统计 + fileModifyApproveHelper.updateFileStorageInfoAfterApproval(fileMetadataInfo, fileMetadataInfo, fileStorageService); + + // 4. 更新工况库绑定 + fileModifyApproveHelper.updateSimulationMappings(fileMetadataInfo.getId(), tempInfo.getSimulationPoolInfoList(), fileSimulationMappingService); + + // 5. 更新扩展字段 + fileModifyApproveHelper.updateExtensions(fileMetadataInfo.getId(), tempInfo.getFileMetadataExtensionRequest(), fileMetadataExtensionService); + + // 6. 更新标签(复用 dictTagIdsCache) + if (tempInfo.getDictTagIdsCache() != null && !tempInfo.getDictTagIdsCache().isEmpty()) { + updateFileTagsFromCache(fileMetadataInfo, dirMetadataInfo, tempInfo.getDictTagIdsCache()); + } + + // 7. 保存主记录 + fileMetadataInfoService.updateById(fileMetadataInfo); + } + + /** + * 根据已缓存的字典标签ID映射更新文件标签。 + * 与 updateFileTags(UpdateFileReq...) 逻辑一致,但入参来源为 tempInfo 中的缓存。 + */ + private void updateFileTagsFromCache(FileMetadataInfo fileMetadataInfo, FileMetadataInfo dirMetadataInfo, Map> dictIdMap) { + Long tenantId = ThreadLocalContext.getTenantId(); + Long creatorId = ThreadLocalContext.getUserId(); + Long fileId = fileMetadataInfo.getId(); + Long dirId = dirMetadataInfo != null ? dirMetadataInfo.getId() : null; + + List ancestorDirIds = dirId != null ? collectAncestorDirIds(dirId) : new ArrayList<>(); + + if (!ancestorDirIds.isEmpty()) { + fileTagRelService.lambdaUpdate() + .eq(FileTagRel::getTenantId, tenantId) + .eq(FileTagRel::getFileId, fileId) + .in(FileTagRel::getDirId, ancestorDirIds) + .remove(); + } + + List newTagRelList = new ArrayList<>(); + long fileSize = fileMetadataInfo.getFileSize() != null ? fileMetadataInfo.getFileSize() : 0L; + + for (Map.Entry> classEntry : dictIdMap.entrySet()) { + Map valueMap = classEntry.getValue(); + for (Integer dictId : valueMap.values()) { + if (dictId == null) { + continue; + } + for (Long dirIdItem : ancestorDirIds) { + FileTagRel tagRel = new FileTagRel(); + tagRel.setFileId(fileId); + tagRel.setTagId(dictId); + tagRel.setDirId(dirIdItem); + tagRel.setTenantId(tenantId); + tagRel.setCreatorId(creatorId); + tagRel.setFileSize(fileSize); + newTagRelList.add(tagRel); + } + } + } + + if (CollectionUtils.isNotEmpty(newTagRelList)) { + fileTagRelService.saveBatch(newTagRelList); + } + } + + private List buildDictTagsFromEnum(Object req) { List dictTags = new ArrayList<>(); for (FileDictTagEnum tagEnum : FileDictTagEnum.values()) { String classFieldName = tagEnum.getDictClassFieldName(); diff --git a/data/src/main/java/com/sdm/data/service/minio/MinioService.java b/data/src/main/java/com/sdm/data/service/minio/MinioService.java index da308bcd..93d4110c 100644 --- a/data/src/main/java/com/sdm/data/service/minio/MinioService.java +++ b/data/src/main/java/com/sdm/data/service/minio/MinioService.java @@ -395,6 +395,40 @@ public class MinioService implements IMinioService { } } + /** + * 将 InputStream 直接上传到 MinIO,适用于动态生成的内容(如批量打包的 ZIP 文件) + * tags 可用于触发桶生命周期规则,例如传入 {"auto-expire":"1d"} 配合规则 auto-expire-1d,1天后自动删除 + * MinIO SDK 按 partSize(10MB) 自动分片,适合大文件 + * + * @param inputStream 数据来源流 + * @param size 流的字节总数(File.length() 获取) + * @param objectName MinIO 对象路径, example: "temp/zip/2026-04-16/taskId.zip" + * @param contentType MIME 类型, example: "application/zip" + * @param bucketName 存储桶名称,为空时使用当前租户桶 + * @param tags 对象标签,可为 null,example: {"auto-expire":"1d"} + */ + public void putObjectFromStream(InputStream inputStream, long size, + String objectName, String contentType, + String bucketName, Map tags) { + try { + createBucketIfNotExists(getBucketName(bucketName)); + // 使用 .tags() 设置对象标签(x-amz-tagging),而非 userMetadata(x-amz-meta-*) + // 只有对象标签才能被桶生命周期规则的 Tag 过滤器匹配,从而触发自动过期删除 + PutObjectArgs.Builder builder = PutObjectArgs.builder() + .bucket(getBucketName(bucketName)) + .object(objectName) + .stream(inputStream, size, 10 * 1024 * 1024) + .contentType(contentType); + if (tags != null && !tags.isEmpty()) { + builder.tags(tags); + } + minioClient.putObject(builder.build()); + } catch (Exception e) { + log.error("putObjectFromStream 上传失败: {}", objectName, e); + throw new RuntimeException(e); + } + } + /** * 重命名MinIO中的文件 * diff --git a/job/src/main/resources/application-lyric.yml b/job/src/main/resources/application-lyric.yml index 0485917a..131d28bf 100644 --- a/job/src/main/resources/application-lyric.yml +++ b/job/src/main/resources/application-lyric.yml @@ -26,7 +26,7 @@ spring: nacos: discovery: server-addr: 192.168.30.146:8848 - group: DEV_GROUP + group: LYRIC_GROUP enabled: true management: endpoints: diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java index df734e94..065e0438 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java @@ -3,11 +3,13 @@ package com.sdm.pbs.controller; import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.entity.req.pbs.DelHpcJobsReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.utils.FilesUtil; import com.sdm.pbs.config.PbsCommonConstant; import com.sdm.pbs.model.bo.BatchWebSubmitResp; +import com.sdm.pbs.model.entity.HpcJobStatus; import com.sdm.pbs.model.entity.SimulationJob; import com.sdm.pbs.model.req.BatchHpcTaskReq; import com.sdm.pbs.model.req.OneHpcTaskReq; @@ -30,6 +32,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; @Slf4j @RestController @@ -117,6 +120,113 @@ public class TaskAdapter implements ITaskFeignClient { return SdmResponse.success("提交成功"); } + @GetMapping("/stopHpcJobAdapter") + @Operation(summary = "作业停止-根据数据主键") + public SdmResponse stopHpcJob(@RequestParam Long id) { + if(Objects.isNull(id)) { + return SdmResponse.failed("id不能为空"); + } + SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getId,id).one(); + if(Objects.isNull(jobDb)) { + return SdmResponse.failed("数据不存在"); + } + if(jobDb.getJobStatus().equals(HpcJobStatus.Canceled.name())|| + jobDb.getJobStatus().equals(HpcJobStatus.Finished.name())|| + jobDb.getJobStatus().equals(HpcJobStatus.Failed.name())) { + return SdmResponse.failed("任务状态为完成态,不能取消"); + } + // 未入hpc节点的 + if(StringUtils.isBlank(jobDb.getJobId())){ + log.info("stopHpcJobAdapter 未入hpc节点的,更新状态为已取消jobName:{}",jobDb.getJobName()); + }else { + SdmResponse response = taskController.stopHpcJob(jobDb.getJobId()); + if(!response.isSuccess()){ + throw new RuntimeException("hpc作业取消失败"); + } + log.info("stopHpcJobAdapter 已入hpc节点的,更新状态为已取消jobName:{}",jobDb.getJobName()); + } + jobDb.setJobStatus(HpcJobStatus.Canceled.name()); + boolean b = simulationJobService.updateById(jobDb); + log.info("stopHpcJobAdapter 更新状态为已取消jobName:{},b:{}",jobDb.getJobName(),b); + return SdmResponse.success(b); + } + + @PostMapping("/delHpcJobsAdapter") + @Operation(summary = "批量删除Hpc任务") + public SdmResponse delHpcJobsAdapter(@RequestBody DelHpcJobsReq req) { + // 1. 入参校验 + if (CollectionUtils.isEmpty(req.getIds())) { + return SdmResponse.failed("删除数据id不能为空"); + } + + // 2. 查询任务列表 + List jobList = simulationJobService.lambdaQuery() + .select(SimulationJob::getId, SimulationJob::getJobId, SimulationJob::getJobStatus) + .in(SimulationJob::getId, req.getIds()) + .list(); + if (CollectionUtils.isEmpty(jobList)) { + return SdmResponse.failed("未查询到对应任务数据"); + } + + for (SimulationJob job : jobList) { + if(!HpcJobStatus.getFinishedStatusNames().contains(job.getJobStatus())) { + throw new RuntimeException("数据任务状态非完成态,不能删除"); + } + } + + // 3. 分组:空jobId / 有效jobId + List emptyJobIds = filterEmptyJobIds(jobList); + List validJobIds = filterValidJobIds(jobList); + + // 4. 处理本地标记删除 + updateDelFlagForEmptyJobs(emptyJobIds); + + // 5. 有有效ID则调用HPC删除 + if (CollectionUtils.isNotEmpty(validJobIds)) { + DelHpcJobsReq delHpcJobsReq = new DelHpcJobsReq(); + delHpcJobsReq.setHpcJobIds(validJobIds); + return taskController.delHpcJobs(delHpcJobsReq); + } + + return SdmResponse.success("批量删除HPC任务成功"); + } + + + /** + * 筛选出 jobId 为空的 主键ID + */ + private List filterEmptyJobIds(List jobList) { + return jobList.stream() + .filter(job -> StringUtils.isBlank(job.getJobId())) + .map(SimulationJob::getId) + .collect(Collectors.toList()); + } + + /** + * 筛选出 jobId 不为空的 有效HPC任务ID + */ + private List filterValidJobIds(List jobList) { + return jobList.stream() + .filter(job -> StringUtils.isNotBlank(job.getJobId())) + .map(SimulationJob::getJobId) + .collect(Collectors.toList()); + } + + /** + * 批量更新 delFlag = Y + */ + private void updateDelFlagForEmptyJobs(List emptyJobIds) { + if (CollectionUtils.isEmpty(emptyJobIds)) { + return; + } + boolean updateSuccess = simulationJobService.lambdaUpdate() + .in(SimulationJob::getId, emptyJobIds) + .set(SimulationJob::getDelFlag, "Y") + .update(); + log.info("[HPC任务删除] 本地标记删除成功,数量:{},结果:{}", emptyJobIds.size(), updateSuccess); + } + + private void getSimulationFile(SubmitHpcTaskRemoteReq req,SubmitHpcTaskReq submitHpcTaskReq,String batchFilePath ) { log.info("提交请求参数:{}", JSONObject.toJSONString(req)); String masterFilepath=""; diff --git a/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java b/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java index 680a45e3..c6427333 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java +++ b/pbs/src/main/java/com/sdm/pbs/model/entity/HpcJobStatus.java @@ -2,6 +2,8 @@ package com.sdm.pbs.model.entity; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.List; + @Getter @AllArgsConstructor public enum HpcJobStatus { @@ -16,4 +18,18 @@ public enum HpcJobStatus { Failed("任务失败"); private final String desc; + + /** + * 获取所有完成态的枚举名称(name)集合 + * 完成态:已取消、已完成、任务失败 + */ + public static List getFinishedStatusNames() { + return List.of( + Canceled.name(), + Finished.name(), + Failed.name() + ); + } + + } \ No newline at end of file diff --git a/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java b/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java index 6beb7ae9..1d0334ab 100644 --- a/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java +++ b/project/src/main/java/com/sdm/project/service/impl/SimulationRunServiceImpl.java @@ -52,10 +52,7 @@ import com.sdm.common.feign.impl.system.SysUserFeignClientImpl; import com.sdm.common.feign.inter.system.IWsPushToolFeignClient; import com.sdm.common.service.DataFileService; import com.sdm.common.service.FileBizTypeService; -import com.sdm.common.utils.CommonUtils; -import com.sdm.common.utils.DateUtils; -import com.sdm.common.utils.PageUtils; -import com.sdm.common.utils.RandomUtil; +import com.sdm.common.utils.*; import com.sdm.project.common.*; import com.sdm.project.dao.*; import com.sdm.project.model.bo.ApprovalDeliverableContentsModel; @@ -977,6 +974,9 @@ public class SimulationRunServiceImpl extends ServiceImpl fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq); + if (fileBaseInfoResp.getData() != null) { + FilesUtil.createRunLocalDir(fileBaseInfoResp.getData().getObjectKey()); + } + } + private SdmResponse createDir(String uuid, String parentUuid, String dirName, Integer dirType, String uuidOwnType) { CreateDirReq createDirReq = new CreateDirReq(); createDirReq.setUuId(uuid); @@ -1218,6 +1227,11 @@ public class SimulationRunServiceImpl extends ServiceImpl