修改:flowable工作流发起任务,数据上行状态优化

This commit is contained in:
2026-03-27 16:07:52 +08:00
parent 11b509872e
commit 6d7c8c1bc7
9 changed files with 173 additions and 59 deletions

View File

@@ -0,0 +1,23 @@
package com.sdm.pbs.config;
/**
* PBS HPC 通用常量类
* 统一管理状态、枚举值、固定配置
*/
public class PbsCommonConstant {
private PbsCommonConstant() {
// 私有构造,禁止实例化
}
// ======================== 任务类型 type ========================
public static final String TASK_TYPE_SINGLE = "single"; // 单独任务
public static final String TASK_TYPE_BATCH = "batch"; // 批量任务
public static final String TASK_TYPE_FLOWABLE = "flowable"; // 流程任务
public static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE";
public static final String EXECUTE_MODE_WEB_BATCH = "WEB_BATCH";
// ======================== 删除标识 delFlag ========================
public static final String DEL_FLAG_NO = "N"; // 未删除
public static final String DEL_FLAG_YES = "Y"; // 已删除
}

View File

@@ -6,11 +6,13 @@ import com.sdm.common.common.ThreadLocalContext;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.utils.FilesUtil;
import com.sdm.pbs.config.PbsCommonConstant;
import com.sdm.pbs.model.bo.BatchWebSubmitResp;
import com.sdm.pbs.model.entity.SimulationJob;
import com.sdm.pbs.model.req.BatchHpcTaskReq;
import com.sdm.pbs.model.req.OneHpcTaskReq;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.model.req.WebSubmitReq;
import com.sdm.pbs.service.ISimulationJobService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -37,8 +39,6 @@ public class TaskAdapter implements ITaskFeignClient {
private static final String EXECUTE_MODE_AUTO = "AUTO";
private static final String EXECUTE_MODE_MANUAL = "MANUAL";
private static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE";
private static final String EXECUTE_MODE_WEB_BATCH = "WEB_BATCH";
private static final String FLOWABLE_SIMULATION_BASEDIR = "/home/simulation/";
@Autowired
@@ -50,30 +50,17 @@ public class TaskAdapter implements ITaskFeignClient {
@Value("${hpc.webSubmit.basePath:/home/simulation/hpc}")
private String BASE_STORAGE_PATH ;
@Value("${testEnStr:}")
private String enStr;
@Value("${testEnStr2:}")
private String testEnStr2;
@Value("${pbs.task.impl}")
private String pbsImpl;
@GetMapping("/testEn")
@Operation(summary = "作业提交")
public SdmResponse<Map<String,Object>> testEn() {
Map<String, Object> map = new HashMap<>();
map.put("enStr", enStr);
map.put("pbsImpl", pbsImpl);
map.put("testEnStr2", testEnStr2);
return SdmResponse.success(map);
}
@PostMapping("/adapterSubmitHpcJob")
@Operation(summary = "作业提交")
public SdmResponse<String> adapterSubmitHpcJob(@RequestBody SubmitHpcTaskRemoteReq req) {
// 从flowable 过来的请求
// 预插入数据
WebSubmitReq webSubmitReq = buildWebSubmitReq(req,"flowable发起任务","flowable");
SdmResponse<SimulationJob> preResp = taskController.preSingleHpcJobSubmit(webSubmitReq,false);
if(!preResp.isSuccess()) {
log.error("flowable作业提交失败插入任务数据异常:{}",JSONObject.toJSONString(req));
throw new RuntimeException("hpc作业提交失败插入任务数据异常");
}
// spdm 回传路径
// 求解文件获取,可以在这一层分片上传,然后拿到对应的路径
SubmitHpcTaskReq submitHpcTaskReq = new SubmitHpcTaskReq();
@@ -86,7 +73,7 @@ public class TaskAdapter implements ITaskFeignClient {
@Operation(summary = "独立网页hpc作业提交")
public SdmResponse<String> webSubmit(@RequestBody OneHpcTaskReq req) {
// 预插入数据
SdmResponse<SimulationJob> preResp = taskController.preSingleHpcJobSubmit(req);
SdmResponse<SimulationJob> preResp = taskController.preSingleHpcJobSubmit(req,false);
if(!preResp.isSuccess()) {
log.error("hpc作业提交失败插入任务数据异常:{}",JSONObject.toJSONString(req));
throw new RuntimeException("hpc作业提交失败插入任务数据异常");
@@ -98,7 +85,7 @@ public class TaskAdapter implements ITaskFeignClient {
getSimulationFile(remoteReq,submitHpcTaskReq,"");
// 公共提交接口
BeanUtils.copyProperties(remoteReq,submitHpcTaskReq);
submitHpcTaskReq.setFrom(EXECUTE_MODE_WEB_SINGLE);
submitHpcTaskReq.setFrom(PbsCommonConstant.EXECUTE_MODE_WEB_SINGLE);
Long userId = ThreadLocalContext.getUserId();
Long tenantId = ThreadLocalContext.getTenantId();
// 异步发起失败后数据更改todo
@@ -109,8 +96,8 @@ public class TaskAdapter implements ITaskFeignClient {
@PostMapping("/batchWebSubmit")
@Operation(summary = "批量网页hpc作业提交")
public SdmResponse<String> batchWebSubmit(@RequestBody BatchHpcTaskReq req) {
// 预插入数据
SdmResponse<SimulationJob> preResp = taskController.preSingleHpcJobSubmit(req);
// 预插入数据
SdmResponse<SimulationJob> preResp = taskController.preSingleHpcJobSubmit(req,true);
if(!preResp.isSuccess()) {
log.error("batchWebSubmit作业提交失败插入任务数据异常:{}",JSONObject.toJSONString(req));
throw new RuntimeException("hpc作业提交失败插入任务数据异常");
@@ -174,7 +161,7 @@ public class TaskAdapter implements ITaskFeignClient {
submitHpcTaskReq.setInputFilePaths(inputFilePaths);
}
// 网页单一任务
if (Objects.equals(req.getExecuteMode(),EXECUTE_MODE_WEB_SINGLE)) {
if (Objects.equals(req.getExecuteMode(), PbsCommonConstant.EXECUTE_MODE_WEB_SINGLE)) {
// 文件的基础路径,就是本次任务的总目录
// /hpc/{userId}/{jobName}/{uuid}
Long userId = ThreadLocalContext.getUserId();
@@ -188,7 +175,7 @@ public class TaskAdapter implements ITaskFeignClient {
req.setStdoutSpdmNasFilePath(fileBasePath);
}
// 网页批量任务
if(Objects.equals(req.getExecuteMode(),EXECUTE_MODE_WEB_BATCH)){
if(Objects.equals(req.getExecuteMode(),PbsCommonConstant.EXECUTE_MODE_WEB_BATCH)){
Pair<String, List<String>> pair = FilesUtil.getSingleSubmitFiles(batchFilePath);
// 本地主文件
submitHpcTaskReq.setMasterFilePath(pair.getLeft());
@@ -215,7 +202,7 @@ public class TaskAdapter implements ITaskFeignClient {
remoteReq.setIndependence(jobDb.getIndependence());
remoteReq.setUuid(req.getUuid());
// 网页单一任务
remoteReq.setExecuteMode(EXECUTE_MODE_WEB_SINGLE);
remoteReq.setExecuteMode(PbsCommonConstant.EXECUTE_MODE_WEB_SINGLE);
return remoteReq;
}
@@ -237,7 +224,7 @@ public class TaskAdapter implements ITaskFeignClient {
remoteReq.setSoftware(jobDb.getSoftware());
remoteReq.setJobType(jobDb.getJobType());
// 网页批量任务
remoteReq.setExecuteMode(EXECUTE_MODE_WEB_BATCH);
remoteReq.setExecuteMode(PbsCommonConstant.EXECUTE_MODE_WEB_BATCH);
return remoteReq;
}
@@ -335,10 +322,17 @@ public class TaskAdapter implements ITaskFeignClient {
String lastDirName = FilesUtil.getLastDirectoryName(taskDirAllPath);
// 构建远程请求参数
SubmitHpcTaskRemoteReq remoteReq = batchBuildCommonRemoteReq(req, jobDb, lastDirName);
// 批量任务的子任务预插入数据
WebSubmitReq prewebSubmitReq = buildWebSubmitReq(remoteReq, remoteReq.getJobDesc(), remoteReq.getJobType());
SdmResponse<SimulationJob> preResp = taskController.preSingleHpcJobSubmit(prewebSubmitReq,false);
if(!preResp.isSuccess()) {
log.error("批量子作业提交失败,插入任务数据异常:{}",JSONObject.toJSONString(req));
throw new RuntimeException("批量子作业提交失败,插入任务数据异常");
}
// 构建任务提交请求
SubmitHpcTaskReq submitHpcTaskReq = buildSubmitTaskReq(remoteReq,taskDirAllPath);
// 设置批量提交来源
submitHpcTaskReq.setFrom(EXECUTE_MODE_WEB_BATCH);
submitHpcTaskReq.setFrom(PbsCommonConstant.EXECUTE_MODE_WEB_BATCH);
// 调用提交接口
SdmResponse<String> response = taskController.submitHpcJob(submitHpcTaskReq);
// 处理成功结果
@@ -395,6 +389,27 @@ public class TaskAdapter implements ITaskFeignClient {
log.info("doBatchSubmitHpcTasks back:{}",JSONObject.toJSONString(results));
}
/**
* 封装Flowable请求为WebSubmitReq对象
* @param req 前端/Flowable传入的请求
* @return 封装好的WebSubmitReq
*/
private WebSubmitReq buildWebSubmitReq(SubmitHpcTaskRemoteReq req,String jobDesc,String type) {
WebSubmitReq webSubmitReq = new WebSubmitReq();
// 生成无横杠UUID
String uuid = UUID.randomUUID().toString().replace("-", "");
req.setUuid(uuid);
// 赋值
webSubmitReq.setUuid(uuid);
webSubmitReq.setSoftwareId(req.getSoftwareId());
webSubmitReq.setSoftware(req.getSoftware());
webSubmitReq.setJobName(req.getJobName());
webSubmitReq.setCoreNum(req.getCoreNum());
webSubmitReq.setJobDesc(jobDesc);
webSubmitReq.setHpcGroup("HPC_PACK");
webSubmitReq.setType(type);
return webSubmitReq;
}
}

View File

@@ -289,8 +289,9 @@ public class TaskController {
return pbsService.downloadFile(req.getJobId(),req.getFileName());
}
public SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req) {
return pbsService.preSingleHpcJobSubmit(req);
// firstBatch 批量任务总数据是true,其他false
public SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch) {
return pbsService.preSingleHpcJobSubmit(req,firstBatch);
}
}

View File

@@ -0,0 +1,17 @@
package com.sdm.pbs.model.entity;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum HpcFileStatus {
uplinking("文件上传中-上行"),
upsuccend("文件上行成功"),
upfailed("文件上行异常"),
generating("任务运行文件生成中"),
uploading("文件回传中-下行"),
finished("文件回传完成"),
failed("文件回传失败");
private final String desc;
}

View File

@@ -0,0 +1,19 @@
package com.sdm.pbs.model.entity;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum HpcJobStatus {
Preparing("准备"),
Configuring("配置中"),
Queued("队列中"),
// 表示:调度器已为任务分配好计算资源,正在与目标节点通信以启动任务执行
Dispatching("调度中"),
Running("执行中"),
Canceled("已取消"),
Finished("已完成"),
Failed("任务失败");
private final String desc;
}

View File

@@ -47,7 +47,7 @@ public class WebSubmitReq implements Serializable {
private String hpcGroup;
/**
* single :单独任务 or batch :批量任务
* single :单独任务 or batch :批量任务flowable 工作流发起的任务
*/
private String type;

View File

@@ -62,6 +62,6 @@ public interface IPbsService {
SdmResponse<String> stopStreamTaskLog(String clientToken);
SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req);
SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch);
}

View File

@@ -148,7 +148,7 @@ public class IPbsHpcServiceImpl implements IPbsService {
}
@Override
public SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req) {
public SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch) {
return SdmResponse.success();
}

View File

@@ -25,10 +25,11 @@ import com.sdm.common.utils.PageUtils;
import com.sdm.pbs.model.bo.CommandResult;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.entity.SimulationHpcCommand;
import com.sdm.pbs.model.entity.SimulationHpcCommandPlaceholder;
import com.sdm.pbs.model.entity.SimulationJob;
import com.sdm.pbs.model.req.*;
import com.sdm.pbs.model.entity.*;
import com.sdm.pbs.model.req.JobFileCallBackReq;
import com.sdm.pbs.model.req.QueryJobReq;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.model.req.WebSubmitReq;
import com.sdm.pbs.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -66,8 +67,6 @@ import java.util.stream.Collectors;
@ConditionalOnProperty(name = "pbs.task.impl", havingValue = "hpc")
public class PbsServiceDecorator implements IPbsServiceDecorator {
private static final String EXECUTE_MODE_WEB_SINGLE = "WEB_SINGLE";
@Autowired
private HpcCommandExcuteUtil hpcCommandExcuteUtil;
@@ -156,7 +155,13 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
String subDir = generateHpcSubDir(req);
// 0.处理hpc文件
String masterFilePath = handleHpcFileUpload(req, subDir);
// 重新赋值用于command拼接
// 更新上行文件状态
String fileStatus=StringUtils.isNotBlank(masterFilePath)? HpcFileStatus.upsuccend.name():HpcFileStatus.upfailed.name();
updateFileStatusByUuid(req.getUuid(),fileStatus);
if(StringUtils.isBlank(masterFilePath)){
throw new RuntimeException("hpc文件上行失败");
}
// 重新赋值用于command拼接
req.setMasterFilePath(masterFilePath);
// 任务输出的文件夹
Pair<String, String> pair = extractDirectoryAndFileName(masterFilePath);
@@ -347,7 +352,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
simulationJob.setTotalElapsedTime(null);
// 标识及状态
// simulationJob.setUuid(null);
simulationJob.setFileStatus("generating");
simulationJob.setFileStatus(HpcFileStatus.generating.name());
// 审计字段
Long userId = ThreadLocalContext.getUserId();
Long tenantId = ThreadLocalContext.getTenantId();
@@ -369,15 +374,18 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
if(StringUtils.isNotBlank(req.getJobDesc())){
simulationJob.setJobDesc(req.getJobDesc());
}
// 独立单一任务
if(Objects.equals(req.getFrom(),EXECUTE_MODE_WEB_SINGLE)){
SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one();
simulationJob.setId(jobDb.getId());
simulationJobService.updateById(simulationJob);
}else {
simulationJobService.save(simulationJob);
}
SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one();
simulationJob.setId(jobDb.getId());
boolean b = simulationJobService.updateById(simulationJob);
log.info("hpc submit end,update :{},jobId:{}",b,jobId);
// // 批量的任务
// if(Objects.equals(req.getFrom(), PbsCommonConstant.EXECUTE_MODE_WEB_BATCH)){
// simulationJobService.save(simulationJob);
// }else {
// SimulationJob jobDb = simulationJobService.lambdaQuery().eq(SimulationJob::getUuid, req.uuid).one();
// simulationJob.setId(jobDb.getId());
// simulationJobService.updateById(simulationJob);
// }
}
}
@@ -622,7 +630,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
}
@Override
public SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req) {
public SdmResponse<SimulationJob> preSingleHpcJobSubmit(WebSubmitReq req,boolean firstBatch) {
// 预提交数据入库
SimulationJob simulationJob = new SimulationJob();
// 基础字段
@@ -633,16 +641,18 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
// job 的类型
simulationJob.setJobType(req.getHpcGroup());
// 0flow 任务 1独立提交单一任务2独立提交批量任务
int independence = Objects.equals(req.getType(),"single") ? 1:2;
int independence = getIndependenceByType(req.getType());
simulationJob.setIndependence(independence);
// 软件及文件关联
simulationJob.setSoftwareId(req.getSoftwareId());
// 任务装备状态
simulationJob.setJobStatus("Preparing");
simulationJob.setJobStatus(HpcJobStatus.Preparing.name());
// 求解器名称
simulationJob.setSolverName(req.getSoftware());
// 文件上行处理中
simulationJob.setFileStatus("uplinking");
// 文件上行处理中,批量的总数据,不存文件状态
if(!firstBatch){
simulationJob.setFileStatus(HpcFileStatus.uplinking.name());
}
// 文件详情
simulationJob.setJobDesc(req.getJobDesc());
simulationJob.setUuid(req.getUuid());
@@ -1080,5 +1090,34 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
}
return resultMap;
}
/**
* 根据任务类型获取独立性标识
* @param type 任务类型 single/batch/flowable
* @return independence 1/2/0
*/
private int getIndependenceByType(String type) {
if (Objects.equals(type, "single")) {
return 1;
}
if (Objects.equals(type, "batch")) {
return 2;
}
// flowable 或其他未知类型,默认返回 0
return 0;
}
/**
* 根据UUID更新文件状态
* @param uuid 唯一标识
* @param fileStatus 状态值generating、uploading、finished、failed
*/
private boolean updateFileStatusByUuid(String uuid, String fileStatus) {
boolean update = simulationJobService.lambdaUpdate()
.eq(SimulationJob::getUuid, uuid)
.set(SimulationJob::getFileStatus, fileStatus)
.update();
log.info("submit hpc file up end:{}", update);
return update;
}
}