@@ -71,6 +71,7 @@ import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
import java.util.zip.ZipEntry ;
import java.util.zip.ZipOutputStream ;
@@ -163,6 +164,10 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
@Autowired
private SimulationTaskExtraMapper simulationTaskExtraMapper ;
// 每批次处理的任务数量,可根据实际业务调整
@Value ( " ${lyric.syncException.batchSize:100} " )
private Integer batchSize ;
/**
* 判断字符串是否可以安全转换为Long类型
*
@@ -1743,54 +1748,87 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
}
/**
* 同步异常任务信息方法
* 同步异常任务信息方法 batchSize
*/
@Override
public SdmResponse syncException ( ) {
try {
// 1. 查询未同步异常的 任务
List < SimulationTaskSyncExBo > unsyncedExceptionTasks = queryUnsyncedExceptionTasks( ) ;
if ( CollectionUtils . isEmpty ( unsyncedExceptionTasks ) ) {
// 1. 查询未同步异常任务的总数量
int total = simulationTaskMapper . queryUnsyncedExceptionTasksTotal ( ) ;
if ( total < = 0 ) {
log . warn ( " 同步异常任务:暂无需要同步的任务 " ) ;
return SdmResponse . success ( " 同步异常任务:暂无需要同步的任务 " ) ;
}
log . info ( " 同步异常任务:待处理总任务数为{}条,将分批次处理,每批次{}条 " , total , batchSize ) ;
// 2. 初始化批次参数,循环分批处理
int processedCount = 0 ; // 已处理的任务总数
int syncTotalCount = 0 ; // 最终同步成功的异常数据总数
int pageNum = 1 ; // 当前批次页码
while ( processedCount < total ) {
log . info ( " 同步异常任务:开始处理第{}批次,已处理{}条,剩余{}条 " ,
pageNum , processedCount , total - processedCount ) ;
// 2. 构建标签对应的节点信息映射
Map < String , SimulationNode > tag1NodeMap = buildTagNodeMap ( unsyncedExceptionTasks , " tag1 " , " project " ) ;
Map < String , SimulationNode > tag5NodeMap = buildTagNodeMap ( unsyncedExceptionTasks , " tag5 " , " workspace " ) ;
// 计算当前批次的起始和结束索引(分页查询)
int fromIndex = processedCount ;
int endIndex = Math . min ( processedCount + batchSize , total ) ;
// 3. 分批查询未同步异常任务
List < SimulationTaskSyncExBo > unsyncedExceptionTasks = queryUnsyncedExceptionTasks ( fromIndex , endIndex ) ;
if ( CollectionUtils . isEmpty ( unsyncedExceptionTasks ) ) {
log . warn ( " 同步异常任务:第{}批次查询结果为空,跳过该批次 " , pageNum ) ;
processedCount + = batchSize ;
pageNum + + ;
continue ;
}
log . info ( " 同步异常任务:第{}批次查询到{}条待处理任务 " , pageNum , unsyncedExceptionTasks . size ( ) ) ;
// 4. 构建标签对应的节点信息映射
Map < String , SimulationNode > tag1NodeMap = buildTagNodeMap ( unsyncedExceptionTasks , " tag1 " , " project " ) ;
Map < String , SimulationNode > tag5NodeMap = buildTagNodeMap ( unsyncedExceptionTasks , " tag5 " , " workspace " ) ;
if ( MapUtils . isEmpty ( tag1NodeMap ) & & MapUtils . isEmpty ( tag5NodeMap ) ) {
log . warn ( " 同步异常任务:第{}批次标签对应的节点信息为空,跳过该批次 " , pageNum ) ;
processedCount + = unsyncedExceptionTasks . size ( ) ;
pageNum + + ;
continue ;
}
if ( MapUtils . isEmpty ( tag1NodeMap ) & & MapUtils . isEmpty ( tag5NodeMap ) ) {
log . warn ( " 同步异常任务:标签对应的节点信息为空 " ) ;
return SdmResponse . success ( " 同步异常任务:标签对应的节点信息为空 " ) ;
// 5. 为任务赋值对应的节点编码
assignNodeCodeToTasks ( unsyncedExceptionTasks , tag1NodeMap , tag5NodeMap ) ;
// 6. 过滤出编码完整的有效任务
List < SimulationTaskSyncExBo > validTasks = filterValidTasks ( unsyncedExceptionTasks ) ;
if ( CollectionUtils . isEmpty ( validTasks ) ) {
log . warn ( " 同步异常任务:第{}批次过滤后无有效任务,跳过该批次 " , pageNum ) ;
processedCount + = unsyncedExceptionTasks . size ( ) ;
pageNum + + ;
continue ;
}
log . info ( " 同步异常任务:第{}批次过滤后剩余{}条有效任务 " , pageNum , validTasks . size ( ) ) ;
// 7. 查询异常数据并匹配任务
List < LyricExceptionModel > newExceptionModels = queryAndMatchExceptions ( validTasks ) ;
if ( CollectionUtils . isEmpty ( newExceptionModels ) ) {
log . info ( " 同步异常任务:第{}批次无新增异常数据需要入库 " , pageNum ) ;
processedCount + = unsyncedExceptionTasks . size ( ) ;
pageNum + + ;
continue ;
}
// 8. 批量入库异常数据,累加同步成功数量
int syncCount = batchSaveExceptionData ( newExceptionModels ) ;
syncTotalCount + = syncCount ;
log . info ( " 同步异常任务:第{}批次同步完成,本次同步{}条,累计同步{}条 " ,
pageNum , syncCount , syncTotalCount ) ;
// 更新已处理数量和页码
processedCount + = unsyncedExceptionTasks . size ( ) ;
pageNum + + ;
}
// 3. 为任务赋值对应的节点编码
assignNodeCodeToTasks ( unsyncedExceptionTasks , tag1NodeMap , tag5NodeMap ) ;
// 4. 过滤出编码完整的任务( tag1Code和tag5Code都不为空)
List < SimulationTaskSyncExBo > validTasks = filterValidTasks ( unsyncedExceptionTasks ) ;
if ( CollectionUtils . isEmpty ( validTasks ) ) {
log . warn ( " 同步异常任务: 过滤后无有效任务( tag1Code/tag5Code为空) " ) ;
return SdmResponse . success ( " 同步异常任务:过滤后无有效任务 " ) ;
}
// 5. 查询异常数据并匹配任务
List < LyricExceptionModel > newExceptionModels = queryAndMatchExceptions ( validTasks ) ;
if ( CollectionUtils . isEmpty ( newExceptionModels ) ) {
log . info ( " 同步异常任务:无新增异常数据需要入库 " ) ;
return SdmResponse . success ( " 同步异常任务:无新增异常数据需要入库 " ) ;
}
// 6. 批量入库异常数据
int syncCount = batchSaveExceptionData ( newExceptionModels ) ;
log . info ( " 同步异常任务完成,本次同步{}条数据 " , syncCount ) ;
return SdmResponse . success ( " 同步成功,本次同步了 " + syncCount + " 条数据 " ) ;
log . info ( " 同步异常任务全部完成,总任务数{}条,累计同步异常数据{}条 " , total , syncTotalCount ) ;
return SdmResponse . success ( " 同步成功,总任务数 " + total + " 条,累计同步了 " + syncTotalCount + " 条异常数据 " ) ;
} catch ( Exception e ) {
log . error ( " 同步异常任务失败 " , e ) ;
log . error ( " 同步异常任务执行失败,已处理部分数据 " , e ) ;
return SdmResponse . failed ( " 同步异常任务失败: " + e . getMessage ( ) ) ;
}
}
@Override
@@ -1944,8 +1982,8 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
/**
* 查询未同步异常的任务列表
*/
private List < SimulationTaskSyncExBo > queryUnsyncedExceptionTasks ( ) {
return simulationTaskMapper . queryLyricHasNotSyncExceptionDatas ( ) ;
private List < SimulationTaskSyncExBo > queryUnsyncedExceptionTasks ( int fromIndex , int endIndex ) {
return simulationTaskMapper . queryLyricHasNotSyncExceptionDatas ( fromIndex , endIndex ) ;
}
/**
@@ -2041,39 +2079,37 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
private int batchSaveExceptionData ( List < LyricExceptionModel > exceptionModels ) {
// 构建扩展表数据
List < SpdmNodeExtraReq > abnormalExtraList = new ArrayList < > ( ) ;
List< SpdmNodeExtraReq> abnormalDetailExtraList = new ArrayList < > ( ) ;
// List< SpdmNodeExtraReq> abnormalDetailExtraList = new ArrayList<>() ;
for ( LyricExceptionModel model : exceptionModels ) {
if ( model = = null | | StringUtils . isBlank ( model . getUuid ( ) ) ) {
log . warn ( " 同步异常任务: 异常模型数据无效, 跳过处理, model={} " , model ) ;
continue ;
}
// 构建异常标识扩展记录
SpdmNodeExtraReq abnormalExtra = new SpdmNodeExtraReq ( ) ;
abnormalExtra . setNodeId ( model . getUuid ( ) ) ;
abnormalExtra . setPropertyName ( " abnormal " ) ;
abnormalExtra . setPropertyValue ( " 1 " ) ;
abnormalExtraList . add ( abnormalExtra ) ;
// // 构建异常详情扩展记录 异常的详情暂时不需要 项目号+工位号 查出来的异常非常多,如果存储的话需要注意
// SpdmNodeExtraReq detailExtra = new SpdmNodeExtraReq();
// detailExtra.setNodeId(model.getUuid());
// detailExtra.setPropertyName("abnormalDetail");
// detailExtra.setPropertyValue(JSONObject.toJSONString(model));
// abnormalDetailExtraList.add(detailExtra);
// 构建异常详情扩展记录(修复原代码空指针问题)
SpdmNodeExtraReq detailExtra = new SpdmNodeExtraReq ( ) ;
detailExtra . setNodeId ( model . getUuid ( ) ) ;
detailExtra . setPropertyName ( " abnormalDetail " ) ;
detailExtra . setPropertyValue ( JSONObject . toJSONString ( model ) ) ;
abnormalDetailExtraList . add ( detailExtra ) ;
}
// 批量插入异常标识
if ( CollectionUtils . isNotEmpty ( abnormalExtraList ) ) {
int abnormalCount = simulationTaskExtraMapper . addTaskExtraBatch ( abnormalExtraList ) ;
log . info ( " 同步异常任务:批量插入异常标识{}条,成功{}条 " , abnormalExtraList . size ( ) , abnormalCount ) ;
}
// 批量插入异常详情
if ( CollectionUtils. isNotEmpty( abnormalDetailExtraList) ) {
int detailCount = simulationTaskExtraMapper. addTaskExtraBatch( abnormalDetailExtraList) ;
log. info ( " 同步异常任务:批量插入异常详情{}条,成功{}条" , abnormalDetailExtraList. size ( ) , detailCount) ;
}
// 批量插入异常详情
// if ( CollectionUtils. isNotEmpty( abnormalDetailExtraList)) {
// int detailCount = simulationTaskExtraMapper. addTaskExtraBatch( abnormalDetailExtraList) ;
// log.info(" 同步异常任务:批量插入异常详情{}条,成功{}条", abnormalDetailExtraList.size(), detailCount) ;
// }
return exceptionModels . size ( ) ;
}
@@ -2086,44 +2122,44 @@ public class LyricInternalServiceImpl implements ILyricInternalService {
*/
public List < LyricExceptionModel > matchExceptionAndTask ( List < LyricVProjectStationExcepTionToDM > exceptionList ,
List < SimulationTaskSyncExBo > noSyncLists ) {
// 步骤1: 预处理 noSyncLists, 转换为 Map( key: tag1Code_tag5Code, value: SimulationTaskSyncExBo)
// 先过滤掉 tag1Code/tag5Code 为空的无效数据
Map < String , SimulationTaskSyncExBo > taskMap = noSyncLists . stream ( )
// 步骤1: 预处理 noSyncLists, 转换为 Map( key: tag1Code_tag5Code, value: List< SimulationTaskSyncExBo> )
Map < String , List < SimulationTaskSyncExBo > > taskMap = noSyncLists . stream ( )
. filter ( bo - > bo ! = null
& & bo . getTag1Code ( ) ! = null & & ! bo . getTag1Code ( ) . trim ( ) . isEmpty ( )
& & bo . getTag5Code ( ) ! = null & & ! bo . getTag5Code ( ) . trim ( ) . isEmpty ( ) )
. collect ( Collectors . toMap (
// 组合键: tag1Code + "_" + tag5Code( 确保唯一性)
bo - > bo . getTag1Code ( ) . trim ( ) + " _ " + bo . getTag5Code ( ) . trim ( ) ,
// value 为当前 bo 对象
bo - > bo ,
// 若存在重复组合键,保留第一个(避免报错)
( existing , replacement ) - > existing
. collect ( Collectors . groupingBy (
bo - > bo . getTag1Code ( ) . trim ( ) + " _ " + bo . getTag5Code ( ) . trim ( )
) ) ;
// 用于记录已经处理过的 matchKey, 实现去重
Set < String > processedKeys = new HashSet < > ( ) ;
// 步骤2: 遍历 exceptionList, 匹配并封装结果
List < LyricExceptionModel > resultList = exceptionList . stream ( )
// 过滤掉异常列表中的无效数据( null 或关键字段为空)
// 过滤掉异常列表中的无效数据
. filter ( exception - > exception ! = null
& & exception . getProjectNum ( ) ! = null & & ! exception . getProjectNum ( ) . trim ( ) . isEmpty ( )
& & exception . getStationCode ( ) ! = null & & ! exception . getStationCode ( ) . trim ( ) . isEmpty ( ) )
. map ( exception - > {
// 生成匹配键: projectNum + "_" + stationNum
// 使用 flatMap 实现一对多转换
. flatMap ( exception - > {
String matchKey = exception . getProjectNum ( ) . trim ( ) + " _ " + exception . getStationCode ( ) . trim ( ) ;
// 从 taskMap 中查找匹配的任务对象
SimulationTaskSyncExBo matchTask = taskMap . get ( matchKey ) ;
if ( matchTask ! = null ) {
// 匹配成功,创建 LyricExceptionModel 对象
LyricExceptionModel model = new LyricExceptionModel ( ) ;
model . setUuid ( matchTask . getUuid ( ) ) ; // 设置任务的 uuid
model . setExceptionModel ( exception ) ; // 设置异常信息对象
return model ;
// 如果该 key 已经处理过,直接返回空流,跳过
if ( processedKeys . contains ( matchKey ) ) {
return Stream . empty ( ) ;
}
// 匹配失败返回 null
return null ;
List < SimulationTaskSyncExBo > matchTasks = taskMap . get ( matchKey ) ;
// 如果没有匹配到任务,返回空流
if ( CollectionUtils . isEmpty ( matchTasks ) ) {
return Stream . empty ( ) ;
}
// 标记该 key 已处理
processedKeys . add ( matchKey ) ;
// 遍历 matchTasks, 为每个任务创建一个 LyricExceptionModel
return matchTasks . stream ( ) . map ( task - > {
LyricExceptionModel model = new LyricExceptionModel ( ) ;
model . setUuid ( task . getUuid ( ) ) ;
// model.setExceptionModel(exception); // 项目编号+工位号在ep视图可能有多条数据, 具体的异常, 暂时不处理
return model ;
} ) ;
} )
// 过滤掉匹配失败的 null 元素
. filter ( model - > model ! = null )
. collect ( Collectors . toList ( ) ) ;
return resultList ;
}