@@ -1,25 +1,29 @@
package com.sdm.flowable.delegate.handler ;
import com.alibaba.fastjson2.JSONObject ;
import com.fasterxml.jackson.core.type.TypeReference ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.sdm.common.common.SdmResponse ;
import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig ;
import com.sdm.common.entity.req.pbs.SimulationCommandPlaceholder Req ;
import com.sdm.common.entity.req.data.GetFileBaseInfo Req ;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq ;
import com.sdm.common.feign.inter.data.IDataFeignClient ;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient ;
import com.sdm.common.log.CoreLogger ;
import com.sdm.common.utils.FilesUtil ;
import com.sdm.flowable.entity.ProcessNodeParam ;
import com.sdm.flowable.service.IAsyncTaskRecordService ;
import com.sdm.flowable.service.IProcessNodeParamService ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.collections4.CollectionUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.flowable.engine.delegate.DelegateExecution ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.stereotype.Component ;
import org.springframework.web.multipart.MultipartFile ;
import java.text.SimpleDateFormat ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.Map ;
import java.util.* ;
import java.util.concurrent.atomic.AtomicReference ;
// HPC(executeType=HPC)
@Slf4j
@@ -32,14 +36,29 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
@Autowired
private ITaskFeignClient taskFeignClient ;
@Autowired
private IProcessNodeParamService processNodeParamService ;
@Autowired
private IDataFeignClient dataFeignClient ;
// @Value("${flowable.simulationBaseDir:F:\\flowable\\}")
@Value ( " ${flowable.simulationBaseDir:} " )
private String simulationBaseDir ;
/*
* params:业务参数
* config: 框架属性
* */
@Override
public void execute ( DelegateExecution execution , Map < String , Object > params , HPCExecuteConfig config ) {
CoreLogger . info ( " hpc process excite,params:{},config:{} " , JSONObject . toJSONString ( params ) , JSONObject . toJSONString ( config ) ) ;
SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq = convertParamsToReq ( params ) ;
// submitHpcTaskRemoteReq.setBeforeNodeId( config. getBeforeNodeId()) ;
String beforeNodeId = config. getBeforeNodeId( ) ;
String masterFileRegularStr = config . getMasterFileRegularStr ( ) ;
String inputFilesRegularStr = config . getInputFilesRegularStr ( ) ;
dealHpcFile ( submitHpcTaskRemoteReq , beforeNodeId , masterFileRegularStr , inputFilesRegularStr ) ;
// 实现HPC处理逻辑...
// INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败)
String status = " INIT " ;
@@ -50,7 +69,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
status = " FAIL " ;
}
String hpcTaskId = submitResp . getData ( ) ;
CoreLogger . info ( " hpc task submit succ jobId:{} " , hpcTaskId ) ;
// 2. 存数据库(提交状态 + 外部任务ID)
asyncTaskRecordService . registerAsyncTask (
execution ,
@@ -60,7 +79,56 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
status ,
hpcTaskId
) ;
log . info ( " HPC 任务 {} 已提交 " , hpcTaskId ) ;
log . info ( " HPC 任务 {} 已提交 " , " hpcTaskId " ) ;
}
private void dealHpcFile ( SubmitHpcTaskRemoteReq submitHpcTaskRemoteReq , String beforeNodeId ,
String masterFileRegularStr , String inputFilesRegularStr ) {
// 查询前节点的工作目录---》本地磁盘对应目录
ProcessNodeParam processNodeParam = processNodeParamService . lambdaQuery ( ) .
eq ( ProcessNodeParam : : getNodeId , beforeNodeId ) . one ( ) ;
String paramJson = processNodeParam . getParamJson ( ) ;
JSONObject paramJsonObject = JSONObject . parseObject ( paramJson ) ;
// outputDirId
Integer outputDirId = paramJsonObject . getInteger ( " outputDirId " ) ;
// 查data表
GetFileBaseInfoReq getFileBaseInfoReq = new GetFileBaseInfoReq ( ) ;
getFileBaseInfoReq . setFileId ( outputDirId ) ;
SdmResponse fileBaseInfoResp = dataFeignClient . getFileBaseInfo ( getFileBaseInfoReq ) ;
if ( ! fileBaseInfoResp . isSuccess ( ) | | fileBaseInfoResp . getData ( ) = = null ) {
throw new RuntimeException ( " 上一节点信息查询失败 " ) ;
}
Object data = fileBaseInfoResp . getData ( ) ;
// 直接强转为 Map( 推荐指定泛型, 避免后续取值强转)
Map < String , Object > dataMap = ( Map < String , Object > ) data ;
String objectKey = dataMap . get ( " objectKey " ) = = null ? " " : dataMap . get ( " objectKey " ) . toString ( ) ;
// 本地文件路径 taskLocalBaseDir
String localSaveDir = simulationBaseDir + objectKey ;
CoreLogger . info ( " beforeNode localSaveDir:{} " , localSaveDir ) ;
// 获取所有文件名字
// 符合正则的从文件
AtomicReference < String > masterFilePath = new AtomicReference < > ( ) ;
// 符合正则的主文件
List < String > inputFilePaths = new ArrayList < > ( ) ;
FilesUtil . collectFiles ( localSaveDir , masterFileRegularStr , inputFilesRegularStr , masterFilePath , inputFilePaths ) ;
try {
MultipartFile masterMultipartFile = FilesUtil . toMultipartFile ( masterFilePath . get ( ) ) ;
submitHpcTaskRemoteReq . setMasterFile ( masterMultipartFile ) ;
if ( CollectionUtils . isNotEmpty ( inputFilePaths ) ) {
List < MultipartFile > inputFiles = new ArrayList < > ( ) ;
for ( String inputFilePath : inputFilePaths ) {
MultipartFile inputFile = FilesUtil . toMultipartFile ( inputFilePath ) ;
inputFiles . add ( inputFile ) ;
}
submitHpcTaskRemoteReq . setInputFiles ( inputFiles ) ;
}
} catch ( Exception e ) {
log . error ( " dealHpcFile error " , e ) ;
throw new RuntimeException ( " 求解文件处理失败 " ) ;
}
}
/**
@@ -71,7 +139,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
if ( params = = null ) {
return req ;
}
ObjectMapper objectMapper = new ObjectMapper ( ) ; // 需确保ObjectMapper已配置或注入
// ObjectMapper objectMapper = new ObjectMapper(); // 需确保ObjectMapper已配置或注入
// 基础字段映射
req . setTimesmap ( params . get ( " timesmap " ) . toString ( ) ) ;
req . setJobName ( params . get ( " jobName " ) . toString ( ) ) ;
@@ -93,26 +161,30 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
req . setTaskName ( params . get ( " taskName " ) . toString ( ) ) ;
req . setRunId ( params . get ( " runId " ) . toString ( ) ) ;
req . setRunName ( params . get ( " runName " ) . toString ( ) ) ;
// mock 时暂时自己传递,后面根据软件名称查询命令
req . setCommand ( params . get ( " command " ) . toString ( ) ) ;
req . setProjectname ( params . get ( " projectname " ) . toString ( ) ) ;
// req.setFeatchFileType(params.get("featchFileType").toString());
// req.setBeforeNodeId(params.get("beforeNodeId").toString());
// 处理commandExpand字段( JSON字符串转Map)
String commandExpandJson = params . get ( " commandExpand " ) . toString ( ) ;
if ( StringUtils . isNotBlank ( commandExpandJson ) ) {
try {
// 将JSON字符串转换为Map<String, SimulationCommandPlaceholderReq>
Map < String , SimulationCommandPlaceholderReq > commandExpand = objectMapper . readValue (
commandExpandJson ,
new TypeReference < Map < String , SimulationCommandPlaceholderReq > > ( ) { }
) ;
// req.setCommandExpand(commandExpand);
} catch ( Exception e ) {
CoreLogger . error ( " convertParamsToReq error:{},params:{} " , e . getMessage ( ) , JSONObject . toJSONString ( params ) ) ;
// 如设为null或空Map
// req.setCommandExpand(new HashMap<>( ));
}
}
// 动态命令
// String commandExpandJson = params.get("commandExpand").toString();
// if (StringUtils.isNotBlank(commandExpandJson)) {
// try {
// // 将JSON字符串转换为Map<String, SimulationCommandPlaceholderReq>
// Map<String, SimulationCommandPlaceholderReq> commandExpand = objectMapper.readValue(
// commandExpandJson,
// new TypeReference<Map<String, SimulationCommandPlaceholderReq>>() {}
// );
//// req.setCommandExpand(commandExpand) ;
// } catch (Exception e) {
// CoreLogger.error("convertParamsToReq error:{},params:{}",e.getMessage(), JSONObject.toJSONString(params ));
// // 如设为null或空Map
//// req.setCommandExpand(new HashMap<>());
// }
// }
return req ;
}