You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
compose-analysis/src/main/java/com/keyware/composeanalysis/service/impl/AnalysisTaskServiceImpl.java

222 lines
10 KiB

7 months ago
package com.keyware.composeanalysis.service.impl;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
7 months ago
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.keyware.common.constant.RedisConst;
import com.keyware.common.constant.enums.AnalysisStatusEnum;
import com.keyware.composeanalysis.constant.MongoDBConst;
import com.keyware.composeanalysis.constant.enums.FileAnalysisStatusEnum;
import com.keyware.composeanalysis.entity.AnalysisTask;
import com.keyware.composeanalysis.mapper.AnalyzeTaskMapper;
import com.keyware.composeanalysis.mongo.FileDataMongoDto;
import com.keyware.composeanalysis.mongo.MatchOpenFileMongoDto;
import com.keyware.composeanalysis.mongo.ProjectAssemblyMongoDto;
import com.keyware.composeanalysis.service.AnalysisTaskService;
import com.keyware.composeanalysis.task.*;
import com.keyware.composeanalysis.util.AnalysisLogUtil;
import com.keyware.composeanalysis.util.RedisUtil;
import com.keyware.composeanalysis.util.SolrUtils;
import com.mongodb.client.MongoClient;
import jakarta.annotation.Resource;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static org.springframework.data.mongodb.core.query.Criteria.where;
/**
* <p>
* 成分分析服务实现类
* </p>
*
* @author liuzongren
* @since 2024-07-23
*/
@Log4j2
@Service
public class AnalysisTaskServiceImpl extends ServiceImpl<AnalyzeTaskMapper, AnalysisTask> implements AnalysisTaskService {
@Resource
private MongoClient mongoClient;
@Resource
private SolrUtils solrUtils;
@Resource
private TaskExecutor taskExecutor;
@Resource
private RedisUtil redisUtil;
@Override
@Async
public void doComposeAnalyze(AnalysisTask analysisTask) throws InterruptedException {
long startTime = System.currentTimeMillis();
log.info("开始成份分析,taskName:{}",analysisTask.getFileName());
//校验文件压缩是否完成
retryGetDecompressionFlag(analysisTask);
//开始分析前,将成分分析的状态为 进行中
LambdaUpdateWrapper<AnalysisTask> updateWrapper = new LambdaUpdateWrapper<>();
7 months ago
updateWrapper.eq(AnalysisTask::getId, analysisTask.getId())
.set(AnalysisTask::getComposeFlag, AnalysisStatusEnum.ANALYSISING.getCode());
this.update(null,updateWrapper);
7 months ago
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, MongoDBConst.DB_NAME_PREFIX + analysisTask.getId());
AnalysisLogUtil.insert(mongoTemplate, "【成分分析】开始:" + analysisTask.getFileName());
//首先进行项目级别的分析,将所有文件的源MD5批量去solr库中匹配
PorjectAnalysisTask projectAnalysisTask = new PorjectAnalysisTask(mongoClient, analysisTask, solrUtils, this);
projectAnalysisTask.doAnalysis();
//项目级的分析完成后,没有匹配中的文件,根据分析的级别,对每个文件进行相应级别的分析
analysisFile(mongoTemplate,analysisTask);
//成份分析完成后,查询所有开源文件,判断当前项目是否开源
checkProjectIfOpen(mongoTemplate,analysisTask);
//修改成分分析状态为完成
7 months ago
updateWrapper.eq(AnalysisTask::getId, analysisTask.getId())
.set(AnalysisTask::getComposeFlag, AnalysisStatusEnum.ANALYSIS_DONE.getCode())
.set(AnalysisTask::getOpenType, analysisTask.getOpenType());
this.update(null,updateWrapper);
7 months ago
//插入分析日志
AnalysisLogUtil.insert(mongoTemplate,"【成分分析】已完成,耗时:"+ DateUtil.between(analysisTask.getAnalysisStartTime(),DateUtil.date(), DateUnit.SECOND) +"秒");
log.info("成份分析完成,taskName:{},耗时:{}",analysisTask.getFileName(),(System.currentTimeMillis()-startTime)/1000 +"秒");
}
@Override
public void stopComposeAnalysisTask(String taskId) {
//将成分分析的任务状态的标志位置为暂停,让线程池中的排队的任务队列停止分析
redisUtil.set(String.format(RedisConst.TASK_RUNNING_STATUS_KEY_PREFIX, taskId), AnalysisStatusEnum.PAUSE_ANALYSIS.getCode());
7 months ago
}
@Override
public Boolean restartComposeAnalysisTask(String taskId) {
boolean result = false;
try {
//删除匹配的开源项目信息
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, MongoDBConst.DB_NAME_PREFIX + taskId);
mongoTemplate.remove(ProjectAssemblyMongoDto.class);
//删除项目匹配的开源文件
mongoTemplate.remove(MatchOpenFileMongoDto.class);
//将文件分析状态设置为未开始分析
mongoTemplate.update(FileDataMongoDto.class)
.apply(new Update().set("openType", false)
.set("fileAnalysisStatus", FileAnalysisStatusEnum.UN_START_ANALYSIS.getCode()))
.all();
//重新开始分析任务
doComposeAnalyze(getById(taskId));
result = true;
} catch (Exception e) {
log.error("重新分析失败", e);
}
return result;
}
@Override
@Async
public void recoveryComposeAnalysisTask(AnalysisTask analysisTask) {
/**
* todo 这里存在一个逻辑缺陷
* 项目级别的分析是无法终止的,当前任务恢复恢复的是文件级的成分分析如果文件级的没有分析完成这里可能会将所有文件进行文件级别的分析
*/
try {
//将成分分析的任务状态的标志位置改为进行中
redisUtil.set(String.format(RedisConst.TASK_RUNNING_STATUS_KEY_PREFIX, analysisTask.getId()), AnalysisStatusEnum.ANALYSISING.getCode());
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, MongoDBConst.DB_NAME_PREFIX + analysisTask.getId());
//项目级的分析完成后
Query unAnalyzedFileQuery = new Query(where("fileAnalysisStatus").ne(FileAnalysisStatusEnum.UN_START_ANALYSIS.getCode())
.and("isDirectory").is(false));
List<FileDataMongoDto> unAnalyzedFiles = mongoTemplate.find(unAnalyzedFileQuery, FileDataMongoDto.class);
if (CollectionUtils.isNotEmpty(unAnalyzedFiles)){
//使用线程池 并行的分析文件
CountDownLatch countDownLatch = new CountDownLatch(unAnalyzedFiles.size());
unAnalyzedFiles.parallelStream().forEach(fileDataMongoDto -> {
IAnalysisTask task = AnalysisTaskFactory.createAnalysisTask(analysisTask, fileDataMongoDto, mongoTemplate, countDownLatch);
taskExecutor.execute(task);
});
countDownLatch.await();
//修改成分分析状态为完成
analysisTask.setComposeFlag(AnalysisStatusEnum.ANALYSIS_DONE.getCode());
this.updateById(analysisTask);
AnalysisLogUtil.insert(mongoTemplate,"成分分析已完成,耗时:"+ DateUtil.between(analysisTask.getCreateTime(),DateUtil.date(), DateUnit.SECOND) +"秒");
}
} catch (Exception e) {
log.error("恢复分析失败", e);
}
}
//引入解压缩有可能会很慢,这里添加重试机制,最多重试6次,60s
private boolean retryGetDecompressionFlag(AnalysisTask analysisTask) {
int retryCount = 0;
while (retryCount < 60) {
AnalysisTask latestAnalysisTask = this.getById(analysisTask.getId());
if (latestAnalysisTask.getDecompressionFlag()) {
analysisTask.setDecompressionFlag(true);
analysisTask.setFileCount(latestAnalysisTask.getFileCount());
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("线程休眠异常", e);
}
retryCount++;
}
return false;
}
//开启单个文件的分析
private void analysisFile(MongoTemplate mongoTemplate,AnalysisTask analysisTask) throws InterruptedException {
Query unAnalyzedFileQuery = new Query(where("fileAnalysisStatus").is(FileAnalysisStatusEnum.UN_START_ANALYSIS.getCode())
.and("isDirectory").is(false));
List<FileDataMongoDto> unAnalyzedFiles = mongoTemplate.find(unAnalyzedFileQuery, FileDataMongoDto.class);
//使用线程池 并行的分析
CountDownLatch countDownLatch = new CountDownLatch(unAnalyzedFiles.size());
unAnalyzedFiles.parallelStream().forEach(fileDataMongoDto -> {
IAnalysisTask task = AnalysisTaskFactory.createAnalysisTask(analysisTask, fileDataMongoDto, mongoTemplate, countDownLatch);
taskExecutor.execute(task);
});
countDownLatch.await();
}
//校验当前项目是否开源
private void checkProjectIfOpen(MongoTemplate mongoTemplate,AnalysisTask analysisTask){
Query openFileQuery = new Query(where("openType").is(true));
Long openFilesCount = mongoTemplate.count(openFileQuery, FileDataMongoDto.class);
//是否开源阈值
Integer openThread = analysisTask.getOpenRateThreshold();
BigDecimal totalFileCount = new BigDecimal(analysisTask.getFileCount());
//统计开源率
BigDecimal openRate = new BigDecimal(openFilesCount).divide(totalFileCount, 4, RoundingMode.HALF_UP).multiply(new BigDecimal(100));
//超过阈值认为开源
if (openRate.compareTo(new BigDecimal(openThread)) >= 0) {
analysisTask.setOpenType(true);
}
}
}