77import htsjdk .samtools .util .IOUtil ;
88import htsjdk .samtools .util .Interval ;
99import htsjdk .samtools .util .IntervalUtil ;
10+ import htsjdk .samtools .util .SortingCollection ;
1011import htsjdk .variant .utils .SAMSequenceDictionaryExtractor ;
1112import htsjdk .variant .variantcontext .Allele ;
1213import htsjdk .variant .variantcontext .Genotype ;
1819import htsjdk .variant .vcf .VCFHeader ;
1920import htsjdk .variant .vcf .VCFHeaderLineType ;
2021import htsjdk .variant .vcf .VCFInfoHeaderLine ;
22+ import htsjdk .variant .vcf .VCFRecordCodec ;
2123import org .apache .commons .lang3 .ArrayUtils ;
2224import org .apache .commons .lang3 .StringUtils ;
2325import org .apache .commons .lang3 .math .NumberUtils ;
8587import java .util .concurrent .atomic .AtomicInteger ;
8688import java .util .stream .Collectors ;
8789
90+ import static picard .sam .AbstractAlignmentMerger .MAX_RECORDS_IN_RAM ;
91+
8892public class LofreqAnalysis extends AbstractCommandPipelineStep <LofreqAnalysis .LofreqWrapper > implements AnalysisStep
8993{
9094 public static final String CATEGORY = "Lowfreq VCF" ;
@@ -191,6 +195,14 @@ public static void runDepthOfCoverage(PipelineContext ctx, AnalysisOutputImpl ou
191195 }
192196 }
193197
198+ public static void addMetaLines (VCFHeader header )
199+ {
200+ header .addMetaDataLine (new VCFInfoHeaderLine ("IN_CONSENSUS" , 1 , VCFHeaderLineType .Flag , "A flag to indicate whether this variant appears in the consensus" ));
201+ header .addMetaDataLine (new VCFInfoHeaderLine ("WITHIN_PBS" , 1 , VCFHeaderLineType .Flag , "A flag to indicate whether this variant is located in primer binding sites" ));
202+ header .addMetaDataLine (new VCFInfoHeaderLine ("GATK_DP" , 1 , VCFHeaderLineType .Integer , "The depth of coverage provided by GATK DepthOfCoverage" ));
203+ header .addMetaDataLine (new VCFInfoHeaderLine ("SB_RECOVER" , 1 , VCFHeaderLineType .Flag , "Indicates this variant was strand bias filtered by LoFreq, but was recovered" ));
204+ }
205+
194206 @ Override
195207 public Output performAnalysisPerSampleRemote (Readset rs , File inputBam , ReferenceGenome referenceGenome , File outputDir ) throws PipelineJobException
196208 {
@@ -409,6 +421,42 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
409421 Set <String > variantsBcftools = runBcftools (inputBam , referenceGenome , mask , minCoverage );
410422 int variantsBcftoolsTotal = variantsBcftools .size ();
411423
424+ boolean runPindel = getProvider ().getParameterByName ("runPindel" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
425+
426+ List <VariantContext > pindelConsensusVariants = null ;
427+ int totalPindelConsensusVariants = 0 ;
428+ if (runPindel )
429+ {
430+ Double minFraction = getProvider ().getParameterByName ("minFraction" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Double .class , 0.0 );
431+ int minDepth = getProvider ().getParameterByName ("minDepth" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
432+ int minInsertSize = getProvider ().getParameterByName ("minInsertSize" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
433+
434+ PindelAnalysis .PindelSettings settings = new PindelAnalysis .PindelSettings ();
435+
436+ File pindelOutput = PindelAnalysis .runPindel (output , getPipelineCtx (), rs , outputDir , inputBam , referenceGenome .getWorkingFastaFile (), minFraction , minDepth , true , coverageOut , minInsertSize );
437+ File pindelVcf = PindelAnalysis .createVcf (pindelOutput , new File (pindelOutput .getParentFile (), FileUtil .getBaseName (pindelOutput ) + ".vcf.gz" ), referenceGenome , settings );
438+
439+ try (VCFFileReader reader = new VCFFileReader (pindelVcf );CloseableIterator <VariantContext > it = reader .iterator ())
440+ {
441+ while (it .hasNext ())
442+ {
443+ VariantContext vc = it .next ();
444+ if (vc .hasAttribute ("IN_CONSENSUS" ))
445+ {
446+ pindelConsensusVariants .add (vc );
447+ totalPindelConsensusVariants ++;
448+ }
449+ }
450+ }
451+
452+ if (totalPindelConsensusVariants == 0 )
453+ {
454+ getPipelineCtx ().getLogger ().info ("deleting empty pindel VCF: " + pindelVcf .getPath ());
455+ pindelVcf .delete ();
456+ new File (pindelVcf .getPath () + ".tbi" ).delete ();
457+ }
458+ }
459+
412460 //Create final VCF:
413461 int totalVariants = 0 ;
414462 int totalGT1 = 0 ;
@@ -425,22 +473,21 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
425473 File loFreqAllVcf = getAllVcf (outputDir , inputBam );
426474 Double strandBiasRecoveryAF = getProvider ().getParameterByName ("strandBiasRecoveryAF" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Double .class , 1.0 );
427475 SAMSequenceDictionary dict = SAMSequenceDictionaryExtractor .extractDictionary (referenceGenome .getSequenceDictionary ().toPath ());
428- VariantContextWriterBuilder writerBuilder1 = new VariantContextWriterBuilder ().setOutputFile (loFreqConsensusVcf ).setReferenceDictionary (dict );
429- VariantContextWriterBuilder writerBuilder2 = new VariantContextWriterBuilder ().setOutputFile (loFreqAllVcf ).setReferenceDictionary (dict );
430- try (VCFFileReader reader = new VCFFileReader (outputVcfSnpEff );CloseableIterator <VariantContext > it = reader .iterator ();VariantContextWriter writer1 = writerBuilder1 .build ();VariantContextWriter writer2 = writerBuilder2 .build ())
476+ VariantContextWriterBuilder writerBuilderConsensus = new VariantContextWriterBuilder ().setOutputFile (loFreqConsensusVcf ).setReferenceDictionary (dict );
477+ VariantContextWriterBuilder writerBuilderAll = new VariantContextWriterBuilder ().setOutputFile (loFreqAllVcf ).setReferenceDictionary (dict );
478+ try (VCFFileReader reader = new VCFFileReader (outputVcfSnpEff );CloseableIterator <VariantContext > it = reader .iterator ();VariantContextWriter writerConsensus = writerBuilderConsensus .build ();VariantContextWriter writerAll = writerBuilderAll .build ())
431479 {
432480 VCFHeader header = reader .getFileHeader ();
433481
434482 //Add INFO annotations
435- header .addMetaDataLine (new VCFInfoHeaderLine ("IN_CONSENSUS" , 1 , VCFHeaderLineType .Flag , "A flag to indicate whether this variant appears in the consensus" ));
436- header .addMetaDataLine (new VCFInfoHeaderLine ("WITHIN_PBS" , 1 , VCFHeaderLineType .Flag , "A flag to indicate whether this variant is located in primer binding sites" ));
437- header .addMetaDataLine (new VCFInfoHeaderLine ("GATK_DP" , 1 , VCFHeaderLineType .Integer , "The depth of coverage provided by GATK DepthOfCoverage" ));
438- header .addMetaDataLine (new VCFInfoHeaderLine ("SB_RECOVER" , 1 , VCFHeaderLineType .Flag , "Indicates this variant was strand bias filtered by LoFreq, but was recovered" ));
483+ addMetaLines (header );
439484
440485 header .setSequenceDictionary (dict );
441- writer1 .writeHeader (header );
442- writer2 .writeHeader (header );
486+ writerConsensus .writeHeader (header );
487+ writerAll .writeHeader (header );
443488
489+ SortingCollection <VariantContext > allVariants = getVariantSorter (header );
490+ SortingCollection <VariantContext > consensusVariants = getVariantSorter (header );
444491 while (it .hasNext ())
445492 {
446493 VariantContext vc = it .next ();
@@ -473,7 +520,7 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
473520 //If still filtered, print and continue
474521 if (vcb .getFilters () != null && !vcb .getFilters ().isEmpty ())
475522 {
476- writer2 .add (vcb .make ());
523+ writerAll .add (vcb .make ());
477524 continue ;
478525 }
479526 }
@@ -539,8 +586,8 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
539586 }
540587
541588 vc = vcb .make ();
542- writer1 .add (vc );
543- writer2 .add (vc );
589+ writerConsensus .add (vc );
590+ writerAll .add (vc );
544591 }
545592 else
546593 {
@@ -549,9 +596,34 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
549596 getPipelineCtx ().getLogger ().error ("The following variant was excluded from the consensus b/c of GATK depth, but DP is above that threshold: " + getHashKey (vc ) + ", AF:" + vc .getAttribute ("AF" ) + "/" + "DP:" + vc .getAttribute ("DP" ) + "/GATK_DP:" + gDepth );
550597 }
551598
552- writer2 .add (vcb .make ());
599+ writerAll .add (vcb .make ());
553600 }
554601 }
602+
603+ //TODO: add pindel
604+ if (!pindelConsensusVariants .isEmpty ())
605+ {
606+
607+ }
608+
609+ try (CloseableIterator <VariantContext > iterator = allVariants .iterator ())
610+ {
611+ while (iterator .hasNext ())
612+ {
613+ writerAll .add (iterator .next ());
614+ }
615+ }
616+ allVariants .cleanup ();
617+
618+
619+ try (CloseableIterator <VariantContext > iterator = consensusVariants .iterator ())
620+ {
621+ while (iterator .hasNext ())
622+ {
623+ writerConsensus .add (iterator .next ());
624+ }
625+ }
626+ consensusVariants .cleanup ();
555627 }
556628
557629 NumberFormat fmt = NumberFormat .getPercentInstance ();
@@ -564,6 +636,10 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
564636 String description = String .format ("Total Variants: %s\n Total GT 1 PCT: %s\n Total GT 50 PCT: %s\n Total Indel GT 1 PCT: %s\n Positions Below Coverage: %s\n Total In LoFreq Consensus: %s\n Total Indel In LoFreq Consensus: %s\n Total Consensus Variant in PBS: %s" , totalVariants , totalGT1 , totalGT50 , totalIndelGT1 , positionsSkipped , totalGTThreshold , totalIndelGTThreshold , totalConsensusInPBS );
565637 description += "\n " + "Strand Bias Recovered: " + filteredVariantsRecovered ;
566638 description += "\n " + "Consensus Strand Bias Recovered: " + consensusFilteredVariantsRecovered ;
639+ if (totalPindelConsensusVariants > 0 )
640+ {
641+ description += "\n Pindel consensus: " + totalPindelConsensusVariants ;
642+ }
567643
568644 if (!variantsBcftools .isEmpty ())
569645 {
@@ -600,88 +676,7 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
600676 output .addSequenceOutput (coverageOut , "Depth of Coverage: " + rs .getName (), "Depth of Coverage" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), null );
601677 output .addSequenceOutput (consensusFastaLoFreq , "Consensus: " + rs .getName (), "Viral Consensus Sequence" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), description );
602678
603- Double minFraction = getProvider ().getParameterByName ("minFraction" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Double .class , 0.0 );
604- int minDepth = getProvider ().getParameterByName ("minDepth" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
605- int minInsertSize = getProvider ().getParameterByName ("minInsertSize" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
606-
607679 boolean runPangolinAndNextClade = getProvider ().getParameterByName ("runPangolin" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
608- boolean runPindel = getProvider ().getParameterByName ("runPindel" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
609-
610- Map <String , Integer > indelMap = new HashMap <>();
611- if (runPindel )
612- {
613- File pindelOutput = PindelAnalysis .runPindel (output , getPipelineCtx (), rs , outputDir , inputBam , referenceGenome .getWorkingFastaFile (), minFraction , minDepth , true , coverageOut , minInsertSize );
614- try (CSVReader reader = new CSVReader (Readers .getReader (pindelOutput ), '\t' ))
615- {
616- final int MAX_DEL_EVENT_COVERAGE = 20 ;
617- final double MIN_AF = 0.25 ;
618- final int MIN_LENGTH_TO_CONSIDER = 10 ;
619- final int MAX_DELETION_LENGTH = 5000 ;
620-
621- String [] line ;
622- while ((line = reader .readNext ()) != null )
623- {
624- if (!("D" .equals (line [0 ]) || "I" .equals (line [0 ]) || "S" .equals (line [0 ])))
625- {
626- continue ;
627- }
628-
629- int start = Integer .parseInt (line [2 ]); //1-based, coordinate prior, like VCF
630- int end = Integer .parseInt (line [3 ]); //1-based, actual coordinate, like VCF
631- String refAllele = line [11 ];
632- String altAllele = line [12 ];
633- int refLength = end - start ;
634- int altLength = altAllele .length ();
635-
636- // Assume LoFreq calls these well enough:
637- if (refLength < MIN_LENGTH_TO_CONSIDER && altLength < MIN_LENGTH_TO_CONSIDER )
638- {
639- continue ;
640- }
641-
642- if (("D" .equals (line [0 ]) || "S" .equals (line [0 ])) && refLength > MAX_DELETION_LENGTH )
643- {
644- continue ;
645- }
646-
647- if (Double .parseDouble (line [6 ]) < MIN_AF )
648- {
649- continue ;
650- }
651-
652- double eventCoverage = 0.0 ;
653- if (StringUtils .trimToNull (line [11 ]) != null )
654- {
655- eventCoverage = Double .parseDouble (line [11 ]);
656- }
657-
658- if (("D" .equals (line [0 ]) || "S" .equals (line [0 ])) && eventCoverage > MAX_DEL_EVENT_COVERAGE )
659- {
660- continue ;
661- }
662-
663- indelMap .put (line [0 ], indelMap .getOrDefault (line [0 ], 0 ) + 1 );
664-
665- VariantContextBuilder vcb = new VariantContextBuilder ();
666- vcb .start (start );
667- vcb .stop (end );
668- vcb .chr (line [1 ]);
669- vcb .alleles (Arrays .asList (Allele .create (refAllele , true ), Allele .create (altAllele )));
670- vcb .attribute ("AF" , Double .parseDouble (line [6 ]));
671- int dp = "I" .equals (line [0 ]) ? Integer .parseInt (line [4 ]) : (int )Double .parseDouble (line [10 ]);
672- vcb .attribute ("DP" , dp );
673- }
674- }
675- catch (IOException e )
676- {
677- throw new PipelineJobException (e );
678- }
679-
680- for (String type : indelMap .keySet ())
681- {
682- description += "\n Pindel " + type + ": " + indelMap .get (type );
683- }
684- }
685680
686681 output .addSequenceOutput (loFreqAllVcf , "LoFreq: " + rs .getName (), CATEGORY , rs .getReadsetId (), null , referenceGenome .getGenomeId (), description );
687682
@@ -710,14 +705,15 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
710705 writer .writeNext (new String []{"LoFreq Analysis" , "MeanCoverage" , String .valueOf (avgDepth )});
711706 writer .writeNext (new String []{"LoFreq Analysis" , "FilteredVariantsRecovered" , String .valueOf (filteredVariantsRecovered )});
712707 writer .writeNext (new String []{"LoFreq Analysis" , "ConsensusFilteredVariantsRecovered" , String .valueOf (consensusFilteredVariantsRecovered )});
713- writer .writeNext (new String []{"LoFreq Analysis" , "HighFreqPindelCalls " , String .valueOf (indelMap . isEmpty () ? 0 : indelMap . values (). stream (). mapToInt ( Integer :: intValue ). sum () )});
708+ writer .writeNext (new String []{"LoFreq Analysis" , "TotalPindelConsensusVariants " , String .valueOf (totalPindelConsensusVariants )});
714709
715710 if (pangolinData != null )
716711 {
717712 writer .writeNext (new String []{"Pangolin" , "PangolinLineage" , pangolinData [1 ]});
718713 writer .writeNext (new String []{"Pangolin" , "PangolinConflicts" , pangolinData [2 ]});
719714 writer .writeNext (new String []{"Pangolin" , "PangolinVersions" , pangolinData [3 ]});
720- writer .writeNext (new String []{"Pangolin" , "PangolinVersions" , pangolinData [4 ]});
715+ //TODO: consider parsing
716+ writer .writeNext (new String []{"Pangolin" , "PangolinComment" , pangolinData [4 ]});
721717 }
722718 else
723719 {
@@ -1195,4 +1191,18 @@ private File getExe()
11951191 return SequencePipelineService .get ().getExeForPackage ("LOFREQPATH" , "lofreq" );
11961192 }
11971193 }
1194+
1195+ private SortingCollection <VariantContext > getVariantSorter (VCFHeader outputHeader ) {
1196+ File tmpDir = IOUtil .getDefaultTmpDir ();
1197+ if (!tmpDir .exists ()) {
1198+ tmpDir .mkdirs ();
1199+ }
1200+
1201+ return SortingCollection .newInstance (
1202+ VariantContext .class ,
1203+ new VCFRecordCodec (outputHeader , true ),
1204+ outputHeader .getVCFRecordComparator (),
1205+ MAX_RECORDS_IN_RAM , tmpDir .toPath ());
1206+ }
1207+
11981208}
0 commit comments