@@ -101,6 +101,8 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
101101
102102 }
103103
104+ private static final double NO_DATA_VAL = -1.0 ;
105+
104106 @ Override
105107 public void processFilesRemote (List <SequenceOutputFile > inputFiles , JobContext ctx ) throws UnsupportedOperationException , PipelineJobException
106108 {
@@ -144,8 +146,11 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
144146 {
145147 Set <Allele > alleles = siteToAllele .getOrDefault (getCacheKey (vc .getContig (), vc .getStart ()), new LinkedHashSet <>());
146148 alleles .addAll (vc .getAlleles ());
149+ if (!siteToAllele .containsKey (getCacheKey (vc .getContig (), vc .getStart ())))
150+ {
151+ whitelistSites .add (Pair .of (vc .getContig (), vc .getStart ()));
152+ }
147153 siteToAllele .put (getCacheKey (vc .getContig (), vc .getStart ()), alleles );
148- whitelistSites .add (Pair .of (vc .getContig (), vc .getStart ()));
149154 }
150155 }
151156 }
@@ -162,7 +167,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
162167
163168 File output = new File (ctx .getOutputDir (), basename + "txt" );
164169 int idx = 0 ;
165- try (CSVWriter writer = new CSVWriter (IOUtil .openFileForBufferedUtf8Writing (output )))
170+ try (CSVWriter writer = new CSVWriter (IOUtil .openFileForBufferedUtf8Writing (output ), '\t' , CSVWriter . NO_QUOTE_CHARACTER ))
166171 {
167172 writer .writeNext (new String []{"OutputFileId" , "ReadsetId" , "Contig" , "Start" , "Ref" , "AltAlleles" , "Depth" , "RefAF" , "AltAFs" });
168173
@@ -171,6 +176,8 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
171176 for (SequenceOutputFile so : inputFiles )
172177 {
173178 VCFFileReader reader = getReader (so .getFile ());
179+
180+ //NOTE: LoFreq should output one VCF line per allele:
174181 try (CloseableIterator <VariantContext > it = reader .query (site .getLeft (), site .getRight (), site .getRight ()))
175182 {
176183 List <String > line = new ArrayList <>();
@@ -185,7 +192,9 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
185192 line .add (refAllele .getBaseString ());
186193
187194 Set <Allele > alleles = siteToAllele .get (key );
188- line .add (alleles .stream ().map (Allele ::getBaseString ).collect (Collectors .joining (";" )));
195+
196+ //Drop the ref (first) element.
197+ line .add (alleles .stream ().map (Allele ::getBaseString ).skip (1 ).collect (Collectors .joining (";" )));
189198
190199 if (!it .hasNext ())
191200 {
@@ -201,16 +210,17 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
201210 {
202211 line .add (String .valueOf (depth ));
203212 line .add ("1" );
204- line .add (", 0" .repeat (alleles .size () - 1 ).substring (1 ));
213+ line .add ("; 0" .repeat (alleles .size () - 1 ).substring (1 ));
205214 }
206215
207216 writer .writeNext (line .toArray (new String []{}));
208217 idx ++;
209218 }
210219 else
211220 {
212- List <String > toWrite = new ArrayList <>(line );
213-
221+ Integer depth = null ;
222+ Double totalAltAf = 0.0 ;
223+ Map <Allele , Double > alleleToAf = new HashMap <>();
214224 while (it .hasNext ())
215225 {
216226 VariantContext vc = it .next ();
@@ -224,50 +234,49 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
224234 throw new PipelineJobException ("Expected AF annotation on line " + key + " in file: " + so .getFile ().getPath ());
225235 }
226236
227- int depth = vc .getAttributeAsInt ("GATK_DP" , 0 );
228- toWrite .add (String .valueOf (depth ));
229-
237+ depth = vc .getAttributeAsInt ("GATK_DP" , 0 );
230238 if (depth < minDepth )
231239 {
232- toWrite .add ("ND" );
233- toWrite .add ("ND" );
240+ vc .getAlternateAlleles ().forEach (a -> {
241+ alleleToAf .put (a , NO_DATA_VAL );
242+ });
234243 }
235244 else
236245 {
237246 List <Double > afs = vc .getAttributeAsDoubleList ("AF" , 0 );
238247 if (afs .size () != vc .getAlternateAlleles ().size ())
239248 {
240- throw new PipelineJobException ("Expected AF annotation on line " + key + " in file: " + so .getFile ().getPath ());
249+ throw new PipelineJobException ("Expected AF annotation on line " + key + " in file: " + so .getFile ().getPath () + " to be of length: " + vc . getAlternateAlleles (). size () );
241250 }
242251
243- double refAf = 1 - afs .stream ().reduce (0.0 , Double ::sum );
244- toWrite .add (String .valueOf (refAf ));
245-
246- List <Double > toAdd = new ArrayList <>();
247- for (Allele a : alleles )
248- {
249- if (a .isReference ())
250- {
251- continue ;
252- }
253-
252+ totalAltAf += afs .stream ().reduce (0.0 , Double ::sum );
253+ vc .getAlternateAlleles ().forEach (a -> {
254254 int aIdx = vc .getAlternateAlleles ().indexOf (a );
255- if (aIdx == -1 )
256- {
257- toAdd .add (0.0 );
258- }
259- else
260- {
261- toAdd .add (afs .get (aIdx ));
262- }
263- }
255+ alleleToAf .put (a , afs .get (aIdx ));
256+ });
257+ }
258+ }
259+
260+ List <String > toWrite = new ArrayList <>(line );
261+ toWrite .add (String .valueOf (depth ));
262+ toWrite .add (String .valueOf (1 - totalAltAf ));
264263
265- toWrite .add (toAdd .stream ().map (String ::valueOf ).collect (Collectors .joining (";" )));
264+ //Add AFs in order:
265+ List <Object > toAdd = new ArrayList <>();
266+ for (Allele a : alleles )
267+ {
268+ if (a .isReference ())
269+ {
270+ continue ;
266271 }
267272
268- writer . writeNext ( toWrite . toArray ( new String []{}) );
269- idx ++ ;
273+ double af = alleleToAf . getOrDefault ( a , 0.0 );
274+ toAdd . add ( af == NO_DATA_VAL ? "ND" : af ) ;
270275 }
276+ toWrite .add (toAdd .stream ().map (String ::valueOf ).collect (Collectors .joining (";" )));
277+
278+ writer .writeNext (toWrite .toArray (new String []{}));
279+ idx ++;
271280 }
272281 }
273282
@@ -294,6 +303,8 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
294303 ctx .getLogger ().error ("Unable to close reader: " + e .getMessage ());
295304 }
296305 }
306+
307+ ctx .getFileManager ().addSequenceOutput (output , "Merged LoFreq Variants: " + inputFiles .size () + " VCFs" , "Merged LoFreq Variant Table" , null , null , genome .getGenomeId (), null );
297308 }
298309
299310 private String getCacheKey (String contig , int start )
0 commit comments