2323import org .apache .commons .lang3 .math .NumberUtils ;
2424import org .apache .commons .math3 .stat .descriptive .rank .Median ;
2525import org .apache .logging .log4j .Logger ;
26- import org .apache .logging .log4j .LogManager ;
2726import org .biojava3 .core .sequence .DNASequence ;
2827import org .biojava3 .core .sequence .compound .AmbiguityDNACompoundSet ;
2928import org .biojava3 .core .sequence .compound .NucleotideCompound ;
4342import org .labkey .api .query .FieldKey ;
4443import org .labkey .api .reader .Readers ;
4544import org .labkey .api .sequenceanalysis .SequenceAnalysisService ;
45+ import org .labkey .api .sequenceanalysis .SequenceOutputFile ;
4646import org .labkey .api .sequenceanalysis .model .AnalysisModel ;
4747import org .labkey .api .sequenceanalysis .model .Readset ;
4848import org .labkey .api .sequenceanalysis .pipeline .AbstractAnalysisStepProvider ;
6060import org .labkey .api .util .PageFlowUtil ;
6161import org .labkey .api .writer .PrintWriters ;
6262import org .labkey .sequenceanalysis .SequenceAnalysisModule ;
63+ import org .labkey .sequenceanalysis .SequenceAnalysisSchema ;
6364import org .labkey .sequenceanalysis .run .util .DepthOfCoverageWrapper ;
6465import org .labkey .sequenceanalysis .run .variant .SNPEffStep ;
6566import org .labkey .sequenceanalysis .run .variant .SnpEffWrapper ;
67+ import org .labkey .sequenceanalysis .util .ReferenceLibraryHelperImpl ;
6668
6769import java .io .File ;
6870import java .io .IOException ;
7375import java .util .ArrayList ;
7476import java .util .Arrays ;
7577import java .util .Collections ;
78+ import java .util .Date ;
7679import java .util .HashMap ;
7780import java .util .HashSet ;
7881import java .util .LinkedHashMap ;
@@ -137,7 +140,7 @@ public Provider()
137140 {{
138141
139142 }}, false ),
140- ToolParameterDescriptor .create ("runPangolin" , "Run Pangolin" , "If selected, Pangolin will be used to score the consensus against common SARS-CoV-2 lineages." , "checkbox" , new JSONObject ()
143+ ToolParameterDescriptor .create ("runPangolin" , "Run Pangolin and NextClade " , "If selected, Pangolin and NextClade will be used to score the consensus against common SARS-CoV-2 lineages." , "checkbox" , new JSONObject ()
141144 {{
142145
143146 }}, false ),
@@ -146,8 +149,20 @@ public Provider()
146149 put ("minValue" , 0.0 );
147150 put ("maxValue" , 1.0 );
148151 put ("decimalPrecision" , 2 );
149- }}, 0.1 )
150- ), PageFlowUtil .set ("sequenceanalysis/field/GenomeFileSelectorField.js" ), "http://csb5.github.io/lofreq/" );
152+ }}, 0.1 ),
153+ ToolParameterDescriptor .create ("dbImport" , "Import SNVs" , "If selected, the LoFreq and pindel variants will be imported into the DB." , "checkbox" , new JSONObject ()
154+ {{
155+
156+ }}, false ),
157+ ToolParameterDescriptor .create ("dbImportThreshold" , "Variant Import AF Threshold" , "If DB import is selected, variants above this AF threshold will be imported." , "ldk-numberfield" , new JSONObject ()
158+ {{
159+
160+ }}, 0.1 ),
161+ ToolParameterDescriptor .create ("dbImportDepthThreshold" , "Variant Import Depth Threshold" , "If DB import is selected, variants in a site with coverage greater than or equal to this value will be imported." , "ldk-integerfield" , new JSONObject ()
162+ {{
163+
164+ }}, 20 )
165+ ), PageFlowUtil .set ("sequenceanalysis/field/GenomeFileSelectorField.js" ), "http://csb5.github.io/lofreq/" );
151166 }
152167
153168
@@ -408,7 +423,7 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
408423 int totalConsensusInPBS = 0 ;
409424
410425 File loFreqConsensusVcf = new File (outputDir , FileUtil .getBaseName (inputBam ) + ".lofreq.consensus.vcf.gz" );
411- File loFreqAllVcf = new File (outputDir , FileUtil . getBaseName ( inputBam ) + ".lofreq.all.vcf.gz" );
426+ File loFreqAllVcf = getAllVcf (outputDir , inputBam );
412427 Double strandBiasRecoveryAF = getProvider ().getParameterByName ("strandBiasRecoveryAF" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Double .class , 1.0 );
413428 SAMSequenceDictionary dict = SAMSequenceDictionaryExtractor .extractDictionary (referenceGenome .getSequenceDictionary ().toPath ());
414429 VariantContextWriterBuilder writerBuilder1 = new VariantContextWriterBuilder ().setOutputFile (loFreqConsensusVcf ).setReferenceDictionary (dict );
@@ -591,7 +606,7 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
591606 int minDepth = getProvider ().getParameterByName ("minDepth" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
592607 int minInsertSize = getProvider ().getParameterByName ("minInsertSize" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
593608
594- boolean runPangolin = getProvider ().getParameterByName ("runPangolin" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
609+ boolean runPangolinAndNextClade = getProvider ().getParameterByName ("runPangolin" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
595610 boolean runPindel = getProvider ().getParameterByName ("runPindel" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
596611
597612 if (runPindel )
@@ -600,10 +615,13 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
600615 }
601616
602617 String [] pangolinData = null ;
603- if (runPangolin )
618+ if (runPangolinAndNextClade )
604619 {
605620 PangolinHandler .updatePangolinRefs (getPipelineCtx ().getLogger ());
606621 pangolinData = PangolinHandler .runPangolin (consensusFastaLoFreq , getPipelineCtx ().getLogger (), output );
622+
623+ File json = NextCladeHandler .runNextClade (consensusFastaLoFreq , getPipelineCtx ().getLogger (), output , outputDir );
624+ output .addSequenceOutput (json , "Nextclade: " + rs .getName (), "NextClade JSON" , rs .getReadsetId (), null , referenceGenome .getGenomeId (), null );
607625 }
608626
609627 //write metrics:
@@ -640,6 +658,11 @@ public Output performAnalysisPerSampleRemote(Readset rs, File inputBam, Referenc
640658 return output ;
641659 }
642660
661+ private File getAllVcf (File outputDir , File inputBam )
662+ {
663+ return new File (outputDir , FileUtil .getBaseName (inputBam ) + ".lofreq.all.vcf.gz" );
664+ }
665+
643666 private Set <String > runBcftools (File inputBam , ReferenceGenome referenceGenome , File mask , int minCoverage ) throws PipelineJobException
644667 {
645668 Set <String > variantsBcftools = new HashSet <>();
@@ -769,8 +792,6 @@ private File getMetricsFile(File outDir)
769792 @ Override
770793 public Output performAnalysisPerSampleLocal (AnalysisModel model , File inputBam , File referenceFasta , File outDir ) throws PipelineJobException
771794 {
772- Map <String , String > valueMap = new HashMap <>();
773-
774795 File metrics = getMetricsFile (outDir );
775796 if (metrics .exists ())
776797 {
@@ -819,8 +840,6 @@ public Output performAnalysisPerSampleLocal(AnalysisModel model, File inputBam,
819840
820841 Table .insert (getPipelineCtx ().getJob ().getUser (), ti , r );
821842 total ++;
822-
823- valueMap .put (line [1 ], value );
824843 }
825844
826845 getPipelineCtx ().getJob ().getLogger ().info ("total metrics: " + total );
@@ -835,9 +854,125 @@ public Output performAnalysisPerSampleLocal(AnalysisModel model, File inputBam,
835854 throw new PipelineJobException ("Unable to find metrics file: " + metrics .getPath ());
836855 }
837856
857+ boolean dbImport = getProvider ().getParameterByName ("dbImport" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
858+ if (dbImport )
859+ {
860+ importNtSnps (model , inputBam , outDir );
861+ }
862+ else
863+ {
864+ getPipelineCtx ().getLogger ().info ("NT SNP DB Import not selected" );
865+ }
866+
867+ boolean runPangolinAndNextClade = getProvider ().getParameterByName ("runPangolin" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Boolean .class , false );
868+ if (runPangolinAndNextClade )
869+ {
870+ //Find the NextClade json:
871+ SimpleFilter filter = new SimpleFilter (FieldKey .fromString ("analysis_id" ), model .getRowId ());
872+ filter .addCondition (FieldKey .fromString ("category" ), NextCladeHandler .NEXTCLADE_JSON );
873+ SequenceOutputFile jsonFileRecord = new TableSelector (SequenceAnalysisSchema .getTable (SequenceAnalysisSchema .TABLE_OUTPUTFILES ), filter , null ).getObject (SequenceOutputFile .class );
874+ if (jsonFileRecord == null )
875+ {
876+ throw new PipelineJobException ("Unable to find NextClade JSON record" );
877+ }
878+
879+ //Find the NextClade json:
880+ SimpleFilter filter2 = new SimpleFilter (FieldKey .fromString ("analysis_id" ), model .getRowId ());
881+ filter2 .addCondition (FieldKey .fromString ("category" ), CATEGORY );
882+ SequenceOutputFile vcfFileRecord = new TableSelector (SequenceAnalysisSchema .getTable (SequenceAnalysisSchema .TABLE_OUTPUTFILES ), filter2 , null ).getObject (SequenceOutputFile .class );
883+ if (vcfFileRecord == null )
884+ {
885+ throw new PipelineJobException ("Unable to find LoFreq VCF record" );
886+ }
887+
888+ NextCladeHandler .processAndImportNextCladeAa (getPipelineCtx ().getJob (), jsonFileRecord , model .getRowId (), vcfFileRecord .getFile (), dbImport );
889+ }
890+ else
891+ {
892+ getPipelineCtx ().getLogger ().info ("NextClade was not run" );
893+ }
894+
838895 return null ;
839896 }
840897
898+ private void importNtSnps (AnalysisModel model , File inputBam , File outDir ) throws PipelineJobException
899+ {
900+ getPipelineCtx ().getLogger ().info ("Importing variants into DB" );
901+
902+ File vcf = getAllVcf (outDir , inputBam );
903+ if (!vcf .exists ())
904+ {
905+ throw new PipelineJobException ("Unable to find file: " + vcf .getPath ());
906+ }
907+
908+ ReferenceGenome genome = SequenceAnalysisService .get ().getReferenceGenome (model .getLibraryId (), getPipelineCtx ().getJob ().getUser ());
909+ ReferenceLibraryHelperImpl helper = new ReferenceLibraryHelperImpl (genome .getWorkingFastaFile (), getPipelineCtx ().getLogger ());
910+
911+ ViralSnpUtil .deleteExistingValues (getPipelineCtx ().getJob (), model .getAnalysisId (), SequenceAnalysisSchema .TABLE_NT_SNP_BY_POS , null );
912+ Double afThreshold = getProvider ().getParameterByName ("dbImportThreshold" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Double .class , 0.0 );
913+ Integer depthTheshold = getProvider ().getParameterByName ("dbImportDepthThreshold" ).extractValue (getPipelineCtx ().getJob (), getProvider (), getStepIdx (), Integer .class , 0 );
914+
915+ TableInfo ti = SequenceAnalysisSchema .getTable (SequenceAnalysisSchema .TABLE_NT_SNP_BY_POS );
916+
917+ try (VCFFileReader reader = new VCFFileReader (vcf );CloseableIterator <VariantContext > it = reader .iterator ())
918+ {
919+ while (it .hasNext ())
920+ {
921+ VariantContext vc = it .next ();
922+
923+ if (vc .isFiltered ())
924+ {
925+ return ;
926+ }
927+
928+ double pct = vc .getAttributeAsDouble ("AF" , 0.0 );
929+ if (pct < afThreshold )
930+ {
931+ continue ;
932+ }
933+
934+ int depth = vc .getAttributeAsInt ("GATK_DP" , 0 );
935+ if (depth < depthTheshold )
936+ {
937+ continue ;
938+ }
939+
940+ if (vc .getAlternateAlleles ().size () != 1 )
941+ {
942+ throw new PipelineJobException ("Expected a single ALT allele" );
943+ }
944+
945+ Map <String , Object > ntRow = new HashMap <>();
946+ ntRow .put ("analysis_id" , model .getAnalysisId ());
947+
948+ Integer refId = helper .resolveSequenceId (vc .getContig ());
949+ if (refId == null )
950+ {
951+ getPipelineCtx ().getLogger ().error ("unknown reference id: [" + vc .getContig () + "]" );
952+ }
953+
954+ ntRow .put ("ref_nt_id" , refId );
955+ ntRow .put ("ref_nt_position" , vc .getStart ()); //use 1-based
956+ ntRow .put ("ref_nt_insert_index" , 0 );
957+ ntRow .put ("ref_nt" , vc .getReference ().getBaseString ());
958+ ntRow .put ("q_nt" , vc .getAlternateAllele (0 ).getBaseString ());
959+
960+ List <Integer > depths = vc .getAttributeAsIntList ("DP4" , 0 );
961+ int alleleDepth = depths .get (2 ) + depths .get (3 );
962+ ntRow .put ("readcount" , alleleDepth );
963+ ntRow .put ("depth" , depth );
964+ ntRow .put ("adj_depth" , vc .getAttribute ("DP" ));
965+ ntRow .put ("container" , getPipelineCtx ().getJob ().getContainer ().getEntityId ());
966+ ntRow .put ("createdby" , getPipelineCtx ().getJob ().getUser ().getUserId ());
967+ ntRow .put ("modifiedby" , getPipelineCtx ().getJob ().getUser ().getUserId ());
968+ ntRow .put ("created" , new Date ());
969+ ntRow .put ("modified" , new Date ());
970+
971+ Table .insert (getPipelineCtx ().getJob ().getUser (), ti , ntRow );
972+ }
973+ }
974+ }
975+
841976 private String getHashKey (VariantContext vc )
842977 {
843978 return vc .getContig () + "<>" + vc .getStart () + vc .getAlternateAlleles ().stream ().map (Allele ::getBaseString ).collect (Collectors .joining (";" ));
0 commit comments