新增:hpc任务批量删除
This commit is contained in:
@@ -0,0 +1,17 @@
|
|||||||
|
package com.sdm.common.entity.resp.pbs.hpc;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class DelHpcJobsResult implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
private List<String> succJobIds;
|
||||||
|
|
||||||
|
private List<String> failedJobIds;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -4,12 +4,15 @@ import com.alibaba.fastjson2.JSON;
|
|||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.alibaba.fastjson2.TypeReference;
|
import com.alibaba.fastjson2.TypeReference;
|
||||||
import com.sdm.common.common.SdmResponse;
|
import com.sdm.common.common.SdmResponse;
|
||||||
|
import com.sdm.common.entity.req.pbs.DelHpcJobsFileToolReq;
|
||||||
|
import com.sdm.common.entity.resp.pbs.hpc.DelHpcJobsResult;
|
||||||
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
|
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
|
||||||
import com.sdm.common.log.CoreLogger;
|
import com.sdm.common.log.CoreLogger;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
import org.springframework.core.io.ByteArrayResource;
|
import org.springframework.core.io.ByteArrayResource;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||||
@@ -55,6 +58,9 @@ public class HpcCommandExcuteUtil {
|
|||||||
@Value("${hpc.callHpcUpload:}")
|
@Value("${hpc.callHpcUpload:}")
|
||||||
private String callHpcUpload;
|
private String callHpcUpload;
|
||||||
|
|
||||||
|
@Value("${hpc.delHpcJobsUrl:}")
|
||||||
|
private String delHpcJobsUrl;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private HttpClientUtil httpClientUtil;
|
private HttpClientUtil httpClientUtil;
|
||||||
|
|
||||||
@@ -239,4 +245,20 @@ public class HpcCommandExcuteUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SdmResponse<DelHpcJobsResult> batchDeleteHpcJobs(List<DelHpcJobsFileToolReq> reqs,WebClient pbsWebClient) {
|
||||||
|
// 发起POST请求,同步阻塞获取响应
|
||||||
|
try {
|
||||||
|
SdmResponse<DelHpcJobsResult> response = pbsWebClient.post()
|
||||||
|
.uri(delHpcJobsUrl)
|
||||||
|
.bodyValue(reqs) // 传入请求体对象
|
||||||
|
.retrieve()
|
||||||
|
.bodyToMono(new ParameterizedTypeReference<SdmResponse<DelHpcJobsResult>>() {}) // 泛型类型指定
|
||||||
|
.block(); // 阻塞等待响应
|
||||||
|
return response;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("batchDeleteHpcJobs post error:{}",e.getMessage());
|
||||||
|
return SdmResponse.failed("发起批量删除hpc节点工作目录异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
44
flowable/src/main/resources/application-yang.yml
Normal file
44
flowable/src/main/resources/application-yang.yml
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
server:
|
||||||
|
port: 7106
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
name: flowable
|
||||||
|
datasource:
|
||||||
|
url: jdbc:mysql://127.0.0.1:3306/flowable?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
|
||||||
|
username: root
|
||||||
|
password: mysql
|
||||||
|
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||||
|
flowable:
|
||||||
|
# ?????????
|
||||||
|
database-schema-update: true
|
||||||
|
# ??????JOB
|
||||||
|
async-executor-activate: true
|
||||||
|
cloud:
|
||||||
|
nacos:
|
||||||
|
discovery:
|
||||||
|
server-addr: 127.0.0.1:8848
|
||||||
|
group: YANG_GROUP
|
||||||
|
enabled: true
|
||||||
|
username: nacos
|
||||||
|
password: nacos
|
||||||
|
|
||||||
|
logging:
|
||||||
|
level:
|
||||||
|
org:
|
||||||
|
flowable: INFO
|
||||||
|
|
||||||
|
mybatis-plus:
|
||||||
|
mapper-locations: classpath*:/mapper/**/*.xml
|
||||||
|
type-aliases-package: com.sdm.flowable.model.entity
|
||||||
|
configuration:
|
||||||
|
map-underscore-to-camel-case: true
|
||||||
|
global-config:
|
||||||
|
db-config:
|
||||||
|
id-type: auto
|
||||||
|
|
||||||
|
security:
|
||||||
|
whitelist:
|
||||||
|
paths:
|
||||||
|
- /process/testHpc
|
||||||
|
- /process/testHpc2
|
||||||
|
- /process/asyncCallback
|
||||||
@@ -10,9 +10,11 @@ import com.sdm.common.common.SdmResponse;
|
|||||||
import com.sdm.common.common.ThreadLocalContext;
|
import com.sdm.common.common.ThreadLocalContext;
|
||||||
import com.sdm.common.entity.enums.MessageTemplateEnum;
|
import com.sdm.common.entity.enums.MessageTemplateEnum;
|
||||||
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
|
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
|
||||||
|
import com.sdm.common.entity.req.pbs.DelHpcJobsFileToolReq;
|
||||||
import com.sdm.common.entity.req.pbs.DelHpcJobsReq;
|
import com.sdm.common.entity.req.pbs.DelHpcJobsReq;
|
||||||
import com.sdm.common.entity.req.system.SendMsgReq;
|
import com.sdm.common.entity.req.system.SendMsgReq;
|
||||||
import com.sdm.common.entity.resp.PageDataResp;
|
import com.sdm.common.entity.resp.PageDataResp;
|
||||||
|
import com.sdm.common.entity.resp.pbs.hpc.DelHpcJobsResult;
|
||||||
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
|
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
|
||||||
import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
|
import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
|
||||||
import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
|
import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
|
||||||
@@ -55,6 +57,7 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@@ -579,7 +582,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
|
|||||||
List<SimulationJob> jobList = simulationJobService.lambdaQuery()
|
List<SimulationJob> jobList = simulationJobService.lambdaQuery()
|
||||||
.in(SimulationJob::getJobId, req.getHpcJobIds())
|
.in(SimulationJob::getJobId, req.getHpcJobIds())
|
||||||
.list();
|
.list();
|
||||||
if (jobList.isEmpty()) {
|
if (CollectionUtils.isEmpty(jobList)) {
|
||||||
throw new RuntimeException("未查询到待删除的HPC任务");
|
throw new RuntimeException("未查询到待删除的HPC任务");
|
||||||
}
|
}
|
||||||
// 2. 校验任务状态(非进行中、文件非上传中)
|
// 2. 校验任务状态(非进行中、文件非上传中)
|
||||||
@@ -589,20 +592,59 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
|
|||||||
throw new RuntimeException("删除的任务状态和文件状态不能是未完成");
|
throw new RuntimeException("删除的任务状态和文件状态不能是未完成");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 调用HPC批量删除接口 todo
|
// 3. 调用HPC批量删除接口
|
||||||
// boolean hpcDelSuccess = batchDeleteHpcJobs(req.getHpcJobIds());
|
SdmResponse<DelHpcJobsResult> response = batchDeleteHpcJobs(jobList);
|
||||||
// if (!hpcDelSuccess) {
|
if (!response.isSuccess()) {
|
||||||
// return SdmResponse.fail("调用HPC批量删除接口失败");
|
return SdmResponse.failed("调用HPC文件工具服务批量删除工作目录失败:{}",JSONObject.toJSONString(response));
|
||||||
// }
|
}
|
||||||
|
DelHpcJobsResult data = response.getData();
|
||||||
|
if(!Objects.isNull(data)&&CollectionUtils.isNotEmpty(data.getSuccJobIds())){
|
||||||
|
List<String> succJobIds = data.getSuccJobIds();
|
||||||
|
List<SimulationJob> newJobs = filterSuccessJobs(jobList, succJobIds);
|
||||||
// 5. 逻辑删除simulation_job表数据
|
// 5. 逻辑删除simulation_job表数据
|
||||||
boolean logicDelSuccess = logicDeleteSimulationJob(jobList);
|
boolean logicDelSuccess = logicDeleteSimulationJob(newJobs);
|
||||||
if (!logicDelSuccess) {
|
if (!logicDelSuccess) {
|
||||||
return SdmResponse.failed("逻辑删除任务数据失败");
|
return SdmResponse.failed("逻辑删除任务数据失败");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return SdmResponse.success("批量删除HPC任务成功");
|
return SdmResponse.success("批量删除HPC任务成功");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 筛选出jobId在succJobIds中的SimulationJob集合
|
||||||
|
* @param jobList 原始任务集合
|
||||||
|
* @param succJobIds 成功的jobId集合
|
||||||
|
* @return 筛选后的新任务集合
|
||||||
|
*/
|
||||||
|
private List<SimulationJob> filterSuccessJobs(List<SimulationJob> jobList, List<String> succJobIds) {
|
||||||
|
// 2. 使用Stream流筛选:jobId存在于succJobIds中的元素
|
||||||
|
List<SimulationJob> newJobList = jobList.stream()
|
||||||
|
// 核心筛选逻辑:判断当前job的jobId是否在succJobIds中
|
||||||
|
.filter(job -> succJobIds.contains(job.getJobId()))
|
||||||
|
// 收集筛选结果到新集合
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
return newJobList;
|
||||||
|
}
|
||||||
|
|
||||||
|
private SdmResponse<DelHpcJobsResult> batchDeleteHpcJobs(List<SimulationJob> jobList) {
|
||||||
|
// 远程请求filetool工具
|
||||||
|
List<DelHpcJobsFileToolReq> reqs = new ArrayList<>();
|
||||||
|
for (SimulationJob job : jobList) {
|
||||||
|
if(StringUtils.isNotBlank(job.getJobId())&&StringUtils.isNotBlank(job.getStdoutHpcFilePath())){
|
||||||
|
DelHpcJobsFileToolReq req = new DelHpcJobsFileToolReq();
|
||||||
|
req.setJobId(job.getJobId());
|
||||||
|
req.setStdoutHpcFilePath(job.getStdoutHpcFilePath());
|
||||||
|
reqs.add(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(CollectionUtils.isEmpty(reqs)){
|
||||||
|
log.error("batchDeleteHpcJobs get reqs null");
|
||||||
|
return SdmResponse.failed("请求参数是null");
|
||||||
|
}
|
||||||
|
SdmResponse<DelHpcJobsResult> response = hpcCommandExcuteUtil.batchDeleteHpcJobs(reqs, pbsWebClient);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean logicDeleteSimulationJob(List<SimulationJob> jobList) {
|
private boolean logicDeleteSimulationJob(List<SimulationJob> jobList) {
|
||||||
for (SimulationJob job : jobList) {
|
for (SimulationJob job : jobList) {
|
||||||
job.setDelFlag("Y");
|
job.setDelFlag("Y");
|
||||||
|
|||||||
@@ -121,6 +121,7 @@ hpc:
|
|||||||
remoteDownLoadFileUrl: http://192.168.190.164:9098/hpcDownload
|
remoteDownLoadFileUrl: http://192.168.190.164:9098/hpcDownload
|
||||||
remoteUploadFileUrl: http://192.168.190.164:9098/uploadHpcFile
|
remoteUploadFileUrl: http://192.168.190.164:9098/uploadHpcFile
|
||||||
callHpcUpload: http://192.168.190.164:9098/addJobQueue
|
callHpcUpload: http://192.168.190.164:9098/addJobQueue
|
||||||
|
delHpcJobsUrl: http://192.168.190.164:9098/delHpcJobs
|
||||||
# 上传头节点文件相关的配置
|
# 上传头节点文件相关的配置
|
||||||
fileToHpc:
|
fileToHpc:
|
||||||
http:
|
http:
|
||||||
|
|||||||
@@ -122,6 +122,7 @@ hpc:
|
|||||||
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
|
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
|
||||||
remoteUploadFileUrl: http://192.168.65.55:9097/uploadHpcFile
|
remoteUploadFileUrl: http://192.168.65.55:9097/uploadHpcFile
|
||||||
callHpcUpload: http://192.168.65.55:9097/addJobQueue
|
callHpcUpload: http://192.168.65.55:9097/addJobQueue
|
||||||
|
delHpcJobsUrl: http://192.168.65.55:9098/delHpcJobs
|
||||||
# 上传头节点文件相关的配置
|
# 上传头节点文件相关的配置
|
||||||
fileToHpc:
|
fileToHpc:
|
||||||
http:
|
http:
|
||||||
|
|||||||
@@ -122,6 +122,7 @@ hpc:
|
|||||||
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
|
# remoteDownLoadFileUrl: http://127.0.0.1:9097/hpcDownload
|
||||||
remoteUploadFileUrl: http://10.122.38.200:9098/uploadHpcFile
|
remoteUploadFileUrl: http://10.122.38.200:9098/uploadHpcFile
|
||||||
callHpcUpload: http://10.122.38.200:9098/addJobQueue
|
callHpcUpload: http://10.122.38.200:9098/addJobQueue
|
||||||
|
delHpcJobsUrl: http://10.122.38.200:9098/delHpcJobs
|
||||||
# 上传头节点文件相关的配置
|
# 上传头节点文件相关的配置
|
||||||
fileToHpc:
|
fileToHpc:
|
||||||
http:
|
http:
|
||||||
|
|||||||
Reference in New Issue
Block a user