package com.keyware.composeanalysis.service.impl; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.keyware.common.constant.RedisConst; import com.keyware.common.constant.enums.AnalysisStatusEnum; import com.keyware.composeanalysis.constant.MongoDBConst; import com.keyware.composeanalysis.constant.enums.FileAnalysisStatusEnum; import com.keyware.composeanalysis.entity.AnalysisTask; import com.keyware.composeanalysis.mapper.AnalyzeTaskMapper; import com.keyware.composeanalysis.mongo.FileDataMongoDto; import com.keyware.composeanalysis.mongo.MatchOpenFileMongoDto; import com.keyware.composeanalysis.mongo.ProjectAssemblyMongoDto; import com.keyware.composeanalysis.service.AnalysisTaskService; import com.keyware.composeanalysis.task.*; import com.keyware.composeanalysis.util.AnalysisLogUtil; import com.keyware.composeanalysis.util.RedisUtil; import com.keyware.composeanalysis.util.SolrUtils; import com.mongodb.client.MongoClient; import jakarta.annotation.Resource; import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import org.springframework.core.task.TaskExecutor; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.List; import java.util.concurrent.CountDownLatch; import static org.springframework.data.mongodb.core.query.Criteria.where; /** *

* 成分分析服务实现类 *

* * @author liuzongren * @since 2024-07-23 */ @Log4j2 @Service public class AnalysisTaskServiceImpl extends ServiceImpl implements AnalysisTaskService { @Resource private MongoClient mongoClient; @Resource private SolrUtils solrUtils; @Resource private TaskExecutor taskExecutor; @Resource private RedisUtil redisUtil; @Override @Async public void doComposeAnalyze(AnalysisTask analysisTask) throws InterruptedException { long startTime = System.currentTimeMillis(); log.info("开始成份分析,taskName:{}",analysisTask.getFileName()); //校验文件压缩是否完成 retryGetDecompressionFlag(analysisTask); //开始分析前,将成分分析的状态为 进行中 LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.set(AnalysisTask::getId, analysisTask.getId()) .set(AnalysisTask::getComposeFlag, AnalysisStatusEnum.ANALYSISING.getCode()); this.update(null,updateWrapper); MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, MongoDBConst.DB_NAME_PREFIX + analysisTask.getId()); AnalysisLogUtil.insert(mongoTemplate, "【成分分析】开始:" + analysisTask.getFileName()); //首先进行项目级别的分析,将所有文件的源MD5批量去solr库中匹配 PorjectAnalysisTask projectAnalysisTask = new PorjectAnalysisTask(mongoClient, analysisTask, solrUtils, this); projectAnalysisTask.doAnalysis(); //项目级的分析完成后,没有匹配中的文件,根据分析的级别,对每个文件进行相应级别的分析 analysisFile(mongoTemplate,analysisTask); //成份分析完成后,查询所有开源文件,判断当前项目是否开源 checkProjectIfOpen(mongoTemplate,analysisTask); //修改成分分析状态为完成 updateWrapper.set(AnalysisTask::getId, analysisTask.getId()) .set(AnalysisTask::getComposeFlag, AnalysisStatusEnum.ANALYSIS_DONE.getCode()) .set(AnalysisTask::getOpenType, analysisTask.getOpenType()); this.update(null,updateWrapper); //插入分析日志 AnalysisLogUtil.insert(mongoTemplate,"【成分分析】已完成,耗时:"+ DateUtil.between(analysisTask.getAnalysisStartTime(),DateUtil.date(), DateUnit.SECOND) +"秒"); log.info("成份分析完成,taskName:{},耗时:{}",analysisTask.getFileName(),(System.currentTimeMillis()-startTime)/1000 +"秒"); } @Override public void stopComposeAnalysisTask(String taskId) { //将成分分析的任务状态的标志位置为暂停,让线程池中的排队的任务队列停止分析 redisUtil.set(String.format(RedisConst.TASK_RUNNING_STATUS_KEY_PREFIX, taskId), AnalysisStatusEnum.PAUSE_ANALYSIS.getCode()); } @Override public Boolean restartComposeAnalysisTask(String taskId) { boolean result = false; try { //删除匹配的开源项目信息 MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, MongoDBConst.DB_NAME_PREFIX + taskId); mongoTemplate.remove(ProjectAssemblyMongoDto.class); //删除项目匹配的开源文件 mongoTemplate.remove(MatchOpenFileMongoDto.class); //将文件分析状态设置为未开始分析 mongoTemplate.update(FileDataMongoDto.class) .apply(new Update().set("openType", false) .set("fileAnalysisStatus", FileAnalysisStatusEnum.UN_START_ANALYSIS.getCode())) .all(); //重新开始分析任务 doComposeAnalyze(getById(taskId)); result = true; } catch (Exception e) { log.error("重新分析失败", e); } return result; } @Override @Async public void recoveryComposeAnalysisTask(AnalysisTask analysisTask) { /** * todo 这里存在一个逻辑缺陷 * 项目级别的分析是无法终止的,当前任务恢复恢复的是文件级的成分分析,如果文件级的没有分析完成,这里可能会将所有文件进行文件级别的分析 */ try { //将成分分析的任务状态的标志位置改为进行中 redisUtil.set(String.format(RedisConst.TASK_RUNNING_STATUS_KEY_PREFIX, analysisTask.getId()), AnalysisStatusEnum.ANALYSISING.getCode()); MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, MongoDBConst.DB_NAME_PREFIX + analysisTask.getId()); //项目级的分析完成后 Query unAnalyzedFileQuery = new Query(where("fileAnalysisStatus").ne(FileAnalysisStatusEnum.UN_START_ANALYSIS.getCode()) .and("isDirectory").is(false)); List unAnalyzedFiles = mongoTemplate.find(unAnalyzedFileQuery, FileDataMongoDto.class); if (CollectionUtils.isNotEmpty(unAnalyzedFiles)){ //使用线程池 并行的分析文件 CountDownLatch countDownLatch = new CountDownLatch(unAnalyzedFiles.size()); unAnalyzedFiles.parallelStream().forEach(fileDataMongoDto -> { IAnalysisTask task = AnalysisTaskFactory.createAnalysisTask(analysisTask, fileDataMongoDto, mongoTemplate, countDownLatch); taskExecutor.execute(task); }); countDownLatch.await(); //修改成分分析状态为完成 analysisTask.setComposeFlag(AnalysisStatusEnum.ANALYSIS_DONE.getCode()); this.updateById(analysisTask); AnalysisLogUtil.insert(mongoTemplate,"成分分析已完成,耗时:"+ DateUtil.between(analysisTask.getCreateTime(),DateUtil.date(), DateUnit.SECOND) +"秒"); } } catch (Exception e) { log.error("恢复分析失败", e); } } //引入解压缩有可能会很慢,这里添加重试机制,最多重试6次,60s private boolean retryGetDecompressionFlag(AnalysisTask analysisTask) { int retryCount = 0; while (retryCount < 60) { AnalysisTask latestAnalysisTask = this.getById(analysisTask.getId()); if (latestAnalysisTask.getDecompressionFlag()) { analysisTask.setDecompressionFlag(true); analysisTask.setFileCount(latestAnalysisTask.getFileCount()); return true; } try { Thread.sleep(1000); } catch (InterruptedException e) { log.error("线程休眠异常", e); } retryCount++; } return false; } //开启单个文件的分析 private void analysisFile(MongoTemplate mongoTemplate,AnalysisTask analysisTask) throws InterruptedException { Query unAnalyzedFileQuery = new Query(where("fileAnalysisStatus").is(FileAnalysisStatusEnum.UN_START_ANALYSIS.getCode()) .and("isDirectory").is(false)); List unAnalyzedFiles = mongoTemplate.find(unAnalyzedFileQuery, FileDataMongoDto.class); //使用线程池 并行的分析 CountDownLatch countDownLatch = new CountDownLatch(unAnalyzedFiles.size()); unAnalyzedFiles.parallelStream().forEach(fileDataMongoDto -> { IAnalysisTask task = AnalysisTaskFactory.createAnalysisTask(analysisTask, fileDataMongoDto, mongoTemplate, countDownLatch); taskExecutor.execute(task); }); countDownLatch.await(); } //校验当前项目是否开源 private void checkProjectIfOpen(MongoTemplate mongoTemplate,AnalysisTask analysisTask){ Query openFileQuery = new Query(where("openType").is(true)); Long openFilesCount = mongoTemplate.count(openFileQuery, FileDataMongoDto.class); //是否开源阈值 Integer openThread = analysisTask.getOpenRateThreshold(); BigDecimal totalFileCount = new BigDecimal(analysisTask.getFileCount()); //统计开源率 BigDecimal openRate = new BigDecimal(openFilesCount).divide(totalFileCount, 4, RoundingMode.HALF_UP).multiply(new BigDecimal(100)); //超过阈值认为开源 if (openRate.compareTo(new BigDecimal(openThread)) >= 0) { analysisTask.setOpenType(true); } } }