flowable各节点初始化本地文件夹

This commit is contained in:
2025-12-05 14:14:22 +08:00
parent fd43685c8a
commit ec22441662
12 changed files with 225 additions and 42 deletions

View File

@@ -10,5 +10,5 @@ public class GetFileBaseInfoReq {
private String path; private String path;
@Schema(description = "文件id") @Schema(description = "文件id")
private Integer fileId; private Long fileId;
} }

View File

@@ -96,8 +96,8 @@ public class DataClientFeignClientImpl implements IDataFeignClient {
} }
@Override @Override
public SdmResponse getFileBaseInfo(GetFileBaseInfoReq req) { public SdmResponse<FileMetadataInfoResp> getFileBaseInfo(GetFileBaseInfoReq req) {
SdmResponse response; SdmResponse<FileMetadataInfoResp> response;
try { try {
response = dataClient.getFileBaseInfo(req); response = dataClient.getFileBaseInfo(req);
log.info("查询文件基本信息响应:"+ response); log.info("查询文件基本信息响应:"+ response);

View File

@@ -47,7 +47,7 @@ public interface IDataFeignClient {
* @return * @return
*/ */
@PostMapping("data/getFileBaseInfo") @PostMapping("data/getFileBaseInfo")
SdmResponse getFileBaseInfo(@RequestBody @Validated GetFileBaseInfoReq req); SdmResponse<FileMetadataInfoResp> getFileBaseInfo(@RequestBody @Validated GetFileBaseInfoReq req);
@PostMapping("/data/approveDataFile") @PostMapping("/data/approveDataFile")
SdmResponse approveDataFile(@RequestBody LaunchApproveReq req); SdmResponse approveDataFile(@RequestBody LaunchApproveReq req);

View File

@@ -116,7 +116,7 @@ public class DataFileController implements IDataFeignClient {
*/ */
@PostMapping("/getFileBaseInfo") @PostMapping("/getFileBaseInfo")
@Operation(summary = "获取文件基本信息", description = "根据请求参数获取指定文件的基本信息") @Operation(summary = "获取文件基本信息", description = "根据请求参数获取指定文件的基本信息")
public SdmResponse getFileBaseInfo(@RequestBody @Validated GetFileBaseInfoReq req) { public SdmResponse<FileMetadataInfoResp> getFileBaseInfo(@RequestBody @Validated GetFileBaseInfoReq req) {
return IDataFileService.getFileBaseInfo(req); return IDataFileService.getFileBaseInfo(req);
} }

View File

@@ -250,7 +250,7 @@ public interface IDataFileService {
* @param req 获取文件基本信息请求参数 * @param req 获取文件基本信息请求参数
* @return 文件基本信息响应 * @return 文件基本信息响应
*/ */
SdmResponse getFileBaseInfo(GetFileBaseInfoReq req); SdmResponse<FileMetadataInfoResp> getFileBaseInfo(GetFileBaseInfoReq req);
/** /**
@@ -322,9 +322,16 @@ public interface IDataFileService {
/** /**
* 下载文件到本地临时目录 * 下载文件到本地临时目录
* @param fileId 文件id * @param fileId 文件id
* @param path 临时目录路径 * @param basePath 临时目录路径
*/ */
void downloadFileToLocal(Long fileId,String path); void downloadFileToLocal(Long fileId,String basePath);
/**
* 批量下载指定文件夹到本地目录
* @param downloadDirId
* @param basePath
*/
void downloadFolderToLocal(Long downloadDirId, String basePath) throws Exception;
/** /**
* 导出知识库 * 导出知识库

View File

@@ -1,5 +1,7 @@
package com.sdm.data.service; package com.sdm.data.service;
import io.minio.Result;
import io.minio.messages.Item;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
@@ -46,6 +48,12 @@ public interface IMinioService {
public void deleteDirectoryRecursively2(String directoryName,String bucketName); public void deleteDirectoryRecursively2(String directoryName,String bucketName);
/**
* 递归获取目录下的所有对象。
*
* @param directoryName 目录名称objectKey
*/
public Iterable<Result<Item>> listObjects(String directoryName);
/** /**
* 递归删除指定目录下的所有对象。 * 递归删除指定目录下的所有对象。
* *

View File

@@ -40,6 +40,8 @@ import com.sdm.data.service.*;
import com.sdm.data.service.impl.dataFileHandle.ApproveContext; import com.sdm.data.service.impl.dataFileHandle.ApproveContext;
import com.sdm.data.service.impl.dataFileHandle.ApproveStrategy; import com.sdm.data.service.impl.dataFileHandle.ApproveStrategy;
import com.sdm.data.service.impl.dataFileHandle.ApproveStrategyFactory; import com.sdm.data.service.impl.dataFileHandle.ApproveStrategyFactory;
import io.minio.Result;
import io.minio.messages.Item;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
@@ -950,12 +952,15 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
} }
@Override @Override
public SdmResponse getFileBaseInfo(GetFileBaseInfoReq req) { public SdmResponse<FileMetadataInfoResp> getFileBaseInfo(GetFileBaseInfoReq req) {
FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery().eq(FileMetadataInfo::getId, req.getFileId()).one(); FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery().eq(FileMetadataInfo::getId, req.getFileId()).one();
if (fileMetadataInfo == null) { if (fileMetadataInfo == null) {
return SdmResponse.failed("文件不存在"); return SdmResponse.failed("文件不存在");
} }
return SdmResponse.success(fileMetadataInfo); FileMetadataInfoResp fileMetadataInfoResp = new FileMetadataInfoResp();
BeanUtils.copyProperties(fileMetadataInfo, fileMetadataInfoResp);
return SdmResponse.success(fileMetadataInfoResp);
} }
@Override @Override
@@ -1268,7 +1273,7 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery().eq(FileMetadataInfo::getId, fileId).one(); FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery().eq(FileMetadataInfo::getId, fileId).one();
if (ObjectUtils.isEmpty(fileMetadataInfo)) { if (ObjectUtils.isEmpty(fileMetadataInfo)) {
log.error("文件不存在"); log.error("文件不存在");
return null; throw new RuntimeException("文件不存在");
} }
String fileObjectKey = fileMetadataInfo.getObjectKey(); String fileObjectKey = fileMetadataInfo.getObjectKey();
@@ -2416,28 +2421,135 @@ public class MinioFileIDataFileServiceImpl implements IDataFileService {
} }
@Override @Override
public void downloadFileToLocal(Long fileId,String path) { public void downloadFileToLocal(Long fileId, String basePath) {
if (fileId == null || basePath == null) {
throw new RuntimeException("参数不能为空");
}
FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery()
.eq(FileMetadataInfo::getId, fileId)
.one();
if (ObjectUtils.isEmpty(fileMetadataInfo)) {
throw new RuntimeException("文件元数据不存在fileId=" + fileId);
}
String originalName = fileMetadataInfo.getOriginalName();
// 安全校验文件名
if (originalName == null || originalName.contains("..") || originalName.startsWith("/")) {
throw new RuntimeException("非法文件名: " + originalName);
}
try { try {
FileMetadataInfo fileMetadataInfo = fileMetadataInfoService.lambdaQuery().eq(FileMetadataInfo::getId, fileId).one(); // 构建安全的目标路径
if (ObjectUtils.isEmpty(fileMetadataInfo)) { Path baseDir = Paths.get(basePath).toAbsolutePath().normalize();
return; Files.createDirectories(baseDir); // 自动创建多级目录
Path targetFile = baseDir.resolve(originalName).normalize();
// baseDir 可能包含 ../ 或绝对路径(如 ../../../etc/passwd防止路径穿越
if (!targetFile.startsWith(baseDir)) {
throw new RuntimeException("非法文件路径: " + targetFile);
} }
String fileObjectKey = fileMetadataInfo.getObjectKey();
boolean hasDownloadPermission = fileUserPermissionService.hasFilePermission(fileMetadataInfo.getId(), ThreadLocalContext.getUserId(), FilePermissionEnum.DOWNLOAD); // 流式下载并写入 处理空文件或大文件问题
if (!hasDownloadPermission) { try (InputStream inputStream = getMinioInputStream(fileId);
return; FileOutputStream outputStream = new FileOutputStream(targetFile.toFile())) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
} }
// 从MinIO下载文件
byte[] fileData = minioService.downloadFile(fileObjectKey);
// 写入响应流
File folder = new File(path);
folder.mkdir();
FileOutputStream outputStream = new FileOutputStream(path + File.separator + fileMetadataInfo.getOriginalName());
outputStream.write(fileData);
outputStream.flush();
outputStream.close();
} catch (Exception e) { } catch (Exception e) {
log.error("下载文件失败", e); log.error("下载文件失败fileId={}, basePath={}, fileName={}",
fileId, basePath, originalName, e);
throw new RuntimeException("文件下载失败", e);
}
}
public void downloadFolderToLocal(Long downloadDirId, String basePath) throws Exception {
if (downloadDirId == null || basePath == null) {
throw new IllegalArgumentException("downloadDirId 和 basePath 不能为空");
}
// 1. 查询文件夹元数据
FileMetadataInfo folderInfo = fileMetadataInfoService.lambdaQuery()
.eq(FileMetadataInfo::getId, downloadDirId)
.one();
if (folderInfo == null) {
throw new RuntimeException("文件夹不存在ID: " + downloadDirId);
}
if(!Objects.equals(DataTypeEnum.DIRECTORY.getValue(),folderInfo.getFileType())){
throw new RuntimeException("指定 ID 不是文件夹类型: " + downloadDirId);
}
String folderObjectKey = folderInfo.getObjectKey();
if (folderObjectKey == null || folderObjectKey.isEmpty()) {
throw new RuntimeException("文件夹 objectKey 为空ID: " + downloadDirId);
}
// 确保以 / 结尾,便于作为前缀匹配
if (!folderObjectKey.endsWith("/")) {
folderObjectKey += "/";
}
// 2. 构建本地基础路径(直接使用 basePath + folderObjectKey
Path localBaseDir = Paths.get(basePath).toAbsolutePath().normalize();
Path fullLocalBase = localBaseDir.resolve(folderObjectKey).normalize();
// 安全校验:确保 fullLocalBase 确实在 basePath 下
if (!fullLocalBase.startsWith(localBaseDir)) {
throw new RuntimeException("非法文件夹路径,可能包含路径穿越: " + folderObjectKey);
}
try {
Files.createDirectories(fullLocalBase);
} catch (Exception e) {
throw new RuntimeException("无法创建本地目录: " + fullLocalBase, e);
}
// 3. 列出 MinIO 中该前缀下的所有对象(递归)
Iterable<Result<Item>> results = minioService.listObjects(folderObjectKey);
// 4. 遍历并下载每个对象(任一失败立即抛出 RuntimeException
for (Result<Item> result : results) {
Item item = result.get();
String objectKey = item.objectName();
// 跳过目录占位符(以 / 结尾的对象)
if (objectKey == null || objectKey.endsWith("/")) {
continue;
}
// 构建本地文件路径basePath + objectKey
Path localFilePath = localBaseDir.resolve(objectKey).normalize();
// 二次安全校验:防止 objectKey 含 ../ 导致越界
if (!localFilePath.startsWith(localBaseDir)) {
throw new RuntimeException("检测到非法对象路径,拒绝下载: " + objectKey);
}
try {
// 创建父目录
Files.createDirectories(localFilePath.getParent());
// 流式下载
try (InputStream inputStream = minioService.getMinioInputStream(objectKey);
FileOutputStream outputStream = new FileOutputStream(localFilePath.toFile())) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
} catch (Exception e) {
throw new RuntimeException("下载对象失败: " + objectKey, e);
}
} }
} }

View File

@@ -192,6 +192,18 @@ public class MinioService implements IMinioService {
deleteDirectoryRecursively(directoryName, bucketName); deleteDirectoryRecursively(directoryName, bucketName);
} }
@Override
public Iterable<Result<Item>> listObjects(String directoryName) {
// 列出目录下的所有对象
return minioClient.listObjects(
ListObjectsArgs.builder()
.bucket(minioConfig.getSpdmBucket())
.prefix(directoryName)
.recursive(true)
.build());
}
/** /**
* 从MinIO删除文件 * 从MinIO删除文件
* @param minioObjectKey 文件名 * @param minioObjectKey 文件名

View File

@@ -29,4 +29,9 @@ public interface FlowableConfig {
*/ */
String ASYNC_TASK_SUFFIX = "_wait"; String ASYNC_TASK_SUFFIX = "_wait";
String WAIT_USER_SUFFIX = "_waitUser"; String WAIT_USER_SUFFIX = "_waitUser";
/**
* 流程的节点本地文件夹基础路径
*/
String FLOWABLE_SIMULATION_BASEDIR ="/home/simulation/";
} }

View File

@@ -1,7 +1,12 @@
package com.sdm.flowable.delegate; package com.sdm.flowable.delegate;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig; import com.sdm.common.entity.flowable.executeConfig.BaseExecuteConfig;
import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.flowable.constants.FlowableConfig; import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.delegate.handler.ExecutionHandler; import com.sdm.flowable.delegate.handler.ExecutionHandler;
import com.sdm.flowable.dto.req.AsyncCallbackRequest; import com.sdm.flowable.dto.req.AsyncCallbackRequest;
@@ -15,6 +20,9 @@ import org.flowable.engine.delegate.JavaDelegate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@@ -39,6 +47,9 @@ public class UniversalDelegate implements JavaDelegate {
@Autowired @Autowired
private RuntimeService runtimeService; private RuntimeService runtimeService;
@Autowired
private IDataFeignClient dataFeignClient;
@Override @Override
public void execute(DelegateExecution execution) { public void execute(DelegateExecution execution) {
try { try {
@@ -55,6 +66,17 @@ public class UniversalDelegate implements JavaDelegate {
log.info("节点执行参数, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params); log.info("节点执行参数, 流程实例ID: {}, 节点ID: {}, 参数: {}", procInstId, nodeId, params);
// 3、创建本地文件夹用于后续节点计算直接从本地读取不需要再从minio中获取数据
Long currentNodeOutputDirId = (Long)params.getOrDefault("outputDirId", null);
GetFileBaseInfoReq getFileBaseInfoReq = new GetFileBaseInfoReq();
getFileBaseInfoReq.setFileId(currentNodeOutputDirId);
SdmResponse<FileMetadataInfoResp> fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq);
if(!fileBaseInfoResp.isSuccess()||fileBaseInfoResp.getData()==null){
throw new RuntimeException("当前节点未查询到输入文件夹");
}
String objectKey = fileBaseInfoResp.getData().getObjectKey();
createLocalDir(objectKey);
// 检查是否有扩展元素配置 // 检查是否有扩展元素配置
if (execution.getCurrentFlowElement().getExtensionElements() != null && if (execution.getCurrentFlowElement().getExtensionElements() != null &&
execution.getCurrentFlowElement().getExtensionElements().get(FlowableConfig.EXECUTECONFIG) != null) { execution.getCurrentFlowElement().getExtensionElements().get(FlowableConfig.EXECUTECONFIG) != null) {
@@ -92,6 +114,29 @@ public class UniversalDelegate implements JavaDelegate {
} }
} }
/**
* 基于objectkey创建本地文件夹
* @param objectKey minio的文件对象路径
*/
private void createLocalDir(String objectKey) {
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
// 构建本地基础路径(直接使用 simulationBaseDir + objectKey
Path localBaseDir = Paths.get(simulationBaseDir).toAbsolutePath().normalize();
Path fullLocalBase = localBaseDir.resolve(objectKey).normalize();
// 安全校验:确保 fullLocalBase 确实在 basePath 下
if (!fullLocalBase.startsWith(localBaseDir)) {
throw new RuntimeException("非法文件夹路径,可能包含路径穿越: " + objectKey);
}
try {
Files.createDirectories(fullLocalBase);
} catch (Exception e) {
throw new RuntimeException("无法创建本地目录: " + fullLocalBase, e);
}
}
/** /**
* 处理任务执行失败的情况 * 处理任务执行失败的情况
* @param execution 当前执行上下文 * @param execution 当前执行上下文

View File

@@ -5,10 +5,12 @@ import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig;
import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.inter.data.IDataFeignClient; import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger; import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.FilesUtil; import com.sdm.common.utils.FilesUtil;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.entity.ProcessNodeParam; import com.sdm.flowable.entity.ProcessNodeParam;
import com.sdm.flowable.service.IAsyncTaskRecordService; import com.sdm.flowable.service.IAsyncTaskRecordService;
import com.sdm.flowable.service.IProcessNodeParamService; import com.sdm.flowable.service.IProcessNodeParamService;
@@ -42,10 +44,6 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
@Autowired @Autowired
private IDataFeignClient dataFeignClient; private IDataFeignClient dataFeignClient;
// @Value("${flowable.simulationBaseDir:F:\\flowable\\}")
@Value("${flowable.simulationBaseDir:}")
private String simulationBaseDir;
/* /*
* params:业务参数 * params:业务参数
* config框架属性 * config框架属性
@@ -94,6 +92,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String masterFileRegularStr, private void dealHpcFile(SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq, String beforeNodeId, String masterFileRegularStr,
String inputFilesRegularStr,String processDefinitionId,String processInstanceId) { String inputFilesRegularStr,String processDefinitionId,String processInstanceId) {
String simulationBaseDir = FlowableConfig.FLOWABLE_SIMULATION_BASEDIR;
// 查询前节点的工作目录---》本地磁盘对应目录 // 查询前节点的工作目录---》本地磁盘对应目录
ProcessNodeParam processNodeParam = processNodeParamService.lambdaQuery(). ProcessNodeParam processNodeParam = processNodeParamService.lambdaQuery().
eq(ProcessNodeParam::getNodeId, beforeNodeId). eq(ProcessNodeParam::getNodeId, beforeNodeId).
@@ -103,18 +102,16 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
String paramJson = processNodeParam.getParamJson(); String paramJson = processNodeParam.getParamJson();
JSONObject paramJsonObject = JSONObject.parseObject(paramJson); JSONObject paramJsonObject = JSONObject.parseObject(paramJson);
// outputDirId // outputDirId
Integer outputDirId = paramJsonObject.getInteger("outputDirId"); Long outputDirId = paramJsonObject.getLong("outputDirId");
// 查data表 // 查data表
GetFileBaseInfoReq getFileBaseInfoReq = new GetFileBaseInfoReq(); GetFileBaseInfoReq getFileBaseInfoReq = new GetFileBaseInfoReq();
getFileBaseInfoReq.setFileId(outputDirId); getFileBaseInfoReq.setFileId(outputDirId);
SdmResponse fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq); SdmResponse<FileMetadataInfoResp> fileBaseInfoResp = dataFeignClient.getFileBaseInfo(getFileBaseInfoReq);
if(!fileBaseInfoResp.isSuccess()||fileBaseInfoResp.getData()==null){ if(!fileBaseInfoResp.isSuccess()||fileBaseInfoResp.getData()==null){
throw new RuntimeException("上一节点信息查询失败"); throw new RuntimeException("上一节点信息查询失败");
} }
Object data = fileBaseInfoResp.getData(); FileMetadataInfoResp fileMetadataInfoResp = fileBaseInfoResp.getData();
// 直接强转为 Map推荐指定泛型避免后续取值强转 String objectKey = fileMetadataInfoResp.getObjectKey();
Map<String, Object> dataMap = (Map<String, Object>) data;
String objectKey = dataMap.get("objectKey") == null ? "": dataMap.get("objectKey").toString();
// 本地文件路径 taskLocalBaseDir // 本地文件路径 taskLocalBaseDir
String localSaveDir = simulationBaseDir + objectKey; String localSaveDir = simulationBaseDir + objectKey;
CoreLogger.info("beforeNode localSaveDir:{}",localSaveDir); CoreLogger.info("beforeNode localSaveDir:{}",localSaveDir);

View File

@@ -33,7 +33,4 @@ mybatis-plus:
global-config: global-config:
db-config: db-config:
id-type: auto id-type: auto
# 仿真任务本地nas盘基础目录
flowable:
simulationBaseDir: /home/simulation/