55import htsjdk .samtools .util .CloseableIterator ;
66import htsjdk .samtools .util .IOUtil ;
77import htsjdk .samtools .util .Interval ;
8+ import htsjdk .variant .utils .SAMSequenceDictionaryExtractor ;
89import htsjdk .variant .variantcontext .Allele ;
910import htsjdk .variant .variantcontext .VariantContext ;
11+ import htsjdk .variant .variantcontext .writer .VariantContextWriter ;
12+ import htsjdk .variant .variantcontext .writer .VariantContextWriterBuilder ;
1013import htsjdk .variant .vcf .VCFFileReader ;
14+ import org .apache .commons .lang3 .StringUtils ;
1115import org .apache .log4j .Logger ;
1216import org .biojava3 .core .sequence .DNASequence ;
1317import org .biojava3 .core .sequence .compound .AmbiguityDNACompoundSet ;
@@ -78,7 +82,12 @@ public Provider()
7882 }}, null ),
7983 ToolParameterDescriptor .create ("minCoverage" , "Min Coverage For Consensus" , "If provided, a consensus will only be called over regions with at least this depth" , "ldk-integerfield" , new JSONObject (){{
8084 put ("minValue" , 0 );
81- }}, 25 )
85+ }}, 25 ),
86+ ToolParameterDescriptor .create ("minFractionForConsensus" , "Min AF For Consensus" , "Any LoFreq variant greater than this threshold will be used as the consensus base." , "ldk-numberfield" , new JSONObject (){{
87+ put ("minValue" , 0.5 );
88+ put ("maxValue" , 1.0 );
89+ put ("decimalPrecision" , 2 );
90+ }}, 0.5 )
8291
8392 ), null , "http://csb5.github.io/lofreq/" );
8493 }
@@ -132,51 +141,53 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
132141 output .addIntermediateFile (outputVcf );
133142 output .addIntermediateFile (new File (outputVcf .getPath () + ".tbi" ));
134143
144+ double minFractionForConsensus = getProvider ().getParameterByName ("minFractionForConsensus" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Double .class , 0.0 );
145+
135146 int totalVariants = 0 ;
136147 int totalGT1 = 0 ;
137148 int totalGT50 = 0 ;
138- Map <String , Double > alleleToAF = new HashMap <>();
139- int totalIndelGT1 = 0 ;
140- try (VCFFileReader reader = new VCFFileReader (outputVcfSnpEff );CloseableIterator <VariantContext > it = reader .iterator ())
149+ int totalGTThreshold = 0 ;
150+
151+ Map <String , List <String >> expectedLoFreq = new HashMap <>();
152+ int totalIndelGT2 = 0 ;
153+
154+ File loFreqConsensusVcf = new File (outputDir , FileUtil .getBaseName (inputBam ) + ".lofreq.consensus.vcf.gz" );
155+ VariantContextWriterBuilder writerBuiler = new VariantContextWriterBuilder ().setOutputFile (loFreqConsensusVcf ).setReferenceDictionary (SAMSequenceDictionaryExtractor .extractDictionary (referenceGenome .getSequenceDictionary ().toPath ()));
156+ try (VCFFileReader reader = new VCFFileReader (outputVcfSnpEff );CloseableIterator <VariantContext > it = reader .iterator ();VariantContextWriter writer = writerBuiler .build ())
141157 {
158+ writer .writeHeader (reader .getFileHeader ());
159+
142160 while (it .hasNext ())
143161 {
144162 VariantContext vc = it .next ();
145163 totalVariants ++;
146- if (vc .hasAttribute ("AF" ) && vc .getAttributeAsDouble ("AF" , 0.0 ) > 0.01 )
164+ if (vc .hasAttribute ("AF" ) && vc .getAttributeAsDouble ("AF" , 0.0 ) > 0.02 )
147165 {
148166 totalGT1 ++;
149167 if (vc .hasAttribute ("INDEL" ))
150168 {
151- totalIndelGT1 ++;
169+ totalIndelGT2 ++;
152170 }
153171 }
154172
155173 if (vc .hasAttribute ("AF" ) && vc .getAttributeAsDouble ("AF" , 0.0 ) > 0.5 )
156174 {
157175 totalGT50 ++;
158- String key = getHashKey (vc );
159- Double af = alleleToAF .getOrDefault (key , 0.0 );
160- af = Math .max (af , vc .getAttributeAsDouble ("AF" , 0.0 ));
161- alleleToAF .put (key , af );
162176 }
163- }
164- }
165177
166- //generate consensus
167- File script = new File (SequenceAnalysisService .get ().getScriptPath (SequenceAnalysisModule .NAME , "external/viral_consensus.sh" ));
168- if (!script .exists ())
169- {
170- throw new PipelineJobException ("Unable to find script: " + script .getPath ());
171- }
178+ if (vc .hasAttribute ("AF" ) && vc .getAttributeAsDouble ("AF" , 0.0 ) > minFractionForConsensus )
179+ {
180+ totalGTThreshold ++;
181+ String key = getHashKey (vc );
182+ List <String > line = expectedLoFreq .getOrDefault (key , new ArrayList <>());
172183
173- SimpleScriptWrapper consensusWrapper = new SimpleScriptWrapper (getPipelineCtx ().getLogger ());
174- consensusWrapper .setWorkingDir (inputBam .getParentFile ());
184+ line .add ("AF:" + vc .getAttribute ("AF" ) + "/" + "DP:" + vc .getAttribute ("DP" ));
175185
176- Integer maxThreads = SequencePipelineService .get ().getMaxThreads (getPipelineCtx ().getLogger ());
177- if (maxThreads != null )
178- {
179- consensusWrapper .addToEnvironment ("SEQUENCEANALYSIS_MAX_THREADS" , maxThreads .toString ());
186+ expectedLoFreq .put (key , line );
187+
188+ writer .add (vc );
189+ }
190+ }
180191 }
181192
182193 //Create a BED file with all regions of coverage below MIN_COVERAGE:
@@ -249,20 +260,38 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
249260 getPipelineCtx ().getLogger ().info ("Total positions with coverage below threshold (" + minCoverage + "): " + positionsSkipped );
250261 getPipelineCtx ().getLogger ().info ("Total intervals of these gaps: " + gapIntervals );
251262
263+ //generate bcftools consensus
264+ File script = new File (SequenceAnalysisService .get ().getScriptPath (SequenceAnalysisModule .NAME , "external/viral_consensus.sh" ));
265+ if (!script .exists ())
266+ {
267+ throw new PipelineJobException ("Unable to find script: " + script .getPath ());
268+ }
269+
270+ SimpleScriptWrapper consensusWrapper = new SimpleScriptWrapper (getPipelineCtx ().getLogger ());
271+ consensusWrapper .setWorkingDir (inputBam .getParentFile ());
272+
273+ Integer maxThreads = SequencePipelineService .get ().getMaxThreads (getPipelineCtx ().getLogger ());
274+ if (maxThreads != null )
275+ {
276+ consensusWrapper .addToEnvironment ("SEQUENCEANALYSIS_MAX_THREADS" , maxThreads .toString ());
277+ }
252278 consensusWrapper .execute (Arrays .asList ("/bin/bash" , script .getPath (), inputBam .getPath (), referenceGenome .getWorkingFastaFile ().getPath (), mask .getPath (), String .valueOf (minCoverage )));
253279 File calls = new File (inputBam .getParentFile (), FileUtil .getBaseName (inputBam ) + ".calls.vcf.gz" );
254280
281+ int totalBcfToolsVariants = 0 ;
255282 Set <VariantContext > variantsBcftoolsOnly = new HashSet <>();
256283 try (VCFFileReader reader = new VCFFileReader (calls );CloseableIterator <VariantContext > it = reader .iterator ())
257284 {
258285 while (it .hasNext ())
259286 {
260287 VariantContext vc = it .next ();
288+ totalBcfToolsVariants ++;
289+
261290 String key = getHashKey (vc );
262- if (alleleToAF .containsKey (key ))
291+ if (expectedLoFreq .containsKey (key ))
263292 {
264293 //Variant shared
265- alleleToAF .remove (key );
294+ expectedLoFreq .remove (key );
266295 }
267296 else
268297 {
@@ -271,45 +300,89 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
271300 }
272301 }
273302
274- String description = String .format ("Total Variants: %s\n Total GT 1 PCT: %s\n Total GT 50 PCT: %s\n Total Indel GT 1 PCT: %s\n Positions Below Coverage: %s" , totalVariants , totalGT1 , totalGT50 , totalIndelGT1 , positionsSkipped );
303+ String description = String .format ("Total Variants: %s\n Total GT 1 PCT: %s\n Total GT 50 PCT: %s\n Total Indel GT 1 PCT: %s\n Positions Below Coverage: %s\n Total In LoFreq Consensus: %s " , totalVariants , totalGT1 , totalGT50 , totalIndelGT2 , positionsSkipped , totalGTThreshold );
275304
276305 if (!variantsBcftoolsOnly .isEmpty ())
277306 {
278- getPipelineCtx ().getLogger ().error ("The following variants were in bcftools, but not GT50% in lofreq: " );
307+ getPipelineCtx ().getLogger ().error ("The following variants were in bcftools, but not above AF threshold (" + minFractionForConsensus + ") in lofreq: " );
279308 variantsBcftoolsOnly .forEach (vc -> getPipelineCtx ().getLogger ().error (getHashKey (vc ) + ", DP=" + vc .getAttribute ("DP" )));
280309
281- description += "\n " + "WARNING: " + variantsBcftoolsOnly .size () + " variants detected in bcftools and not lofreq" ;
310+ description += "\n " + "WARNING: " + variantsBcftoolsOnly .size () + " variants detected in bcftools consensus and not lofreq" ;
282311 }
283312
284- if (!alleleToAF .isEmpty ())
313+ if (!expectedLoFreq .isEmpty ())
285314 {
286315 getPipelineCtx ().getLogger ().error ("The following variants were GT50% in lofreq, but not in bcftools: " );
287- alleleToAF .keySet ().forEach (vc -> getPipelineCtx ().getLogger ().error (vc + ", AF=" + alleleToAF .get (vc )));
316+ expectedLoFreq .keySet ().forEach (vc -> getPipelineCtx ().getLogger ().error (vc + ", " + StringUtils .join (expectedLoFreq .get (vc ), "," )));
317+
318+ description += "\n " + "WARNING: " + expectedLoFreq .size () + " variants detected in lofreq consensus but not bcftools" ;
319+ }
320+
321+ File consensusFastaBcfTools = new File (inputBam .getParentFile (), FileUtil .getBaseName (inputBam .getName ()) + ".bcftools.consensus.fasta" );
322+ if (!consensusFastaBcfTools .exists ())
323+ {
324+ throw new PipelineJobException ("Expected file not found: " + consensusFastaBcfTools .getPath ());
325+ }
326+
327+ int bcfToolsConsensusNs = replaceFastHeader (consensusFastaBcfTools , rs , "bcftools|Variants:" + totalBcfToolsVariants );
328+
329+ //Make consensus using lofreq:
330+ File consensusFastaLoFreq = generateConsensus (loFreqConsensusVcf , referenceGenome .getWorkingFastaFile (), mask );
331+ int lofreqConsensusNs = replaceFastHeader (consensusFastaLoFreq , rs , "Lofreq|Variants:" + totalGTThreshold );
332+ description += "\n Consensus Ns: " + lofreqConsensusNs ;
288333
289- description += "\n " + "WARNING: " + alleleToAF .size () + " variants detected in lofreq and not bcftools" ;
334+ if (bcfToolsConsensusNs != lofreqConsensusNs )
335+ {
336+ getPipelineCtx ().getLogger ().warn ("Consensus ambiguities from bcftools and lofreq did not match: " + bcfToolsConsensusNs + " / " + lofreqConsensusNs );
290337 }
291338
292- File consensusFasta = new File (inputBam .getParentFile (), FileUtil .getBaseName (inputBam .getName ()) + ".consensus.fasta" );
293- if (!consensusFasta .exists ())
339+ output .addSequenceOutput (outputVcfSnpEff , "LoFreq: " + rs .getName (), CATEGORY , rs .getReadsetId (), null , referenceGenome .getGenomeId (), description );
340+ output .addSequenceOutput (coverageOut , "Depth of Coverage: " + rs .getName (), "Depth of Coverage" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), null );
341+ output .addSequenceOutput (consensusFastaBcfTools , "Consensus: " + rs .getName (), "Viral Consensus Sequence" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), description );
342+
343+ return output ;
344+ }
345+
346+ private File generateConsensus (File loFreqConsensusVcf , File fasta , File maskBed ) throws PipelineJobException
347+ {
348+ File ret = new File (loFreqConsensusVcf .getParentFile (), SequenceAnalysisService .get ().getUnzippedBaseName (loFreqConsensusVcf .getName ()) + ".lofreq.consensus.fasta" );
349+ List <String > args = new ArrayList <>();
350+
351+ args .add (SequencePipelineService .get ().getExeForPackage ("BCFTOOLS" , "bcftools" ).getPath ());
352+ args .add ("consensus" );
353+ args .add ("-f" );
354+ args .add (fasta .getPath ());
355+ args .add ("-m" );
356+ args .add (maskBed .getPath ());
357+ args .add ("-o" );
358+ args .add (ret .getPath ());
359+ args .add (loFreqConsensusVcf .getPath ());
360+
361+ new SimpleScriptWrapper (getPipelineCtx ().getLogger ()).execute (args );
362+
363+ if (!ret .exists ())
294364 {
295- throw new PipelineJobException ("Expected file not found : " + consensusFasta .getPath ());
365+ throw new PipelineJobException ("Unable to find expected file : " + ret .getPath ());
296366 }
297367
368+ return ret ;
369+ }
370+
371+ private int replaceFastHeader (File consensusFasta , Readset rs , String suffix ) throws PipelineJobException
372+ {
373+ AtomicInteger totalN = new AtomicInteger ();
298374 DNASequence seq ;
299375 try (InputStream is = IOUtil .openFileForReading (consensusFasta ))
300376 {
301377 FastaReader <DNASequence , NucleotideCompound > fastaReader = new FastaReader <>(is , new GenericFastaHeaderParser <>(), new DNASequenceCreator (AmbiguityDNACompoundSet .getDNACompoundSet ()));
302378 LinkedHashMap <String , DNASequence > fastaData = fastaReader .process ();
303379
304- AtomicInteger totalN = new AtomicInteger ();
305380 seq = fastaData .values ().iterator ().next ();
306381 seq .forEach (nt -> {
307382 if (nt .getUpperedBase ().equals ("N" )) {
308383 totalN .getAndIncrement ();
309384 }
310385 });
311-
312- description += "\n Consensus Ns: " + totalN .get ();
313386 }
314387 catch (IOException e )
315388 {
@@ -331,6 +404,11 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
331404
332405 header .append (rs .getLibraryType () == null ? rs .getApplication () : rs .getLibraryType ());
333406
407+ if (suffix != null )
408+ {
409+ header .append ("|" ).append (suffix );
410+ }
411+
334412 writer .println (">" + header );
335413 writer .println (seq .getSequenceAsString ());
336414 }
@@ -339,12 +417,7 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
339417 throw new PipelineJobException (e );
340418 }
341419
342- output .addSequenceOutput (outputVcfSnpEff , "LoFreq: " + rs .getName (), CATEGORY , rs .getReadsetId (), null , referenceGenome .getGenomeId (), description );
343- output .addSequenceOutput (coverageOut , "Depth of Coverage: " + rs .getName (), "Depth of Coverage" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), null );
344-
345- output .addSequenceOutput (consensusFasta , "Consensus: " + rs .getName (), "Viral Consensus Sequence" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), description );
346-
347- return output ;
420+ return totalN .get ();
348421 }
349422
350423 @ Override
0 commit comments