3333import java .util .Collections ;
3434import java .util .HashMap ;
3535import java .util .HashSet ;
36- import java .util .LinkedHashSet ;
36+ import java .util .LinkedHashMap ;
3737import java .util .List ;
3838import java .util .Map ;
3939import java .util .Set ;
@@ -103,6 +103,98 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
103103
104104 private static final double NO_DATA_VAL = -1.0 ;
105105
106+ private class SiteAndAlleles
107+ {
108+ private final String _contig ;
109+ private final int _start ;
110+
111+ Map <Allele , List <Allele >> _encounteredAlleles = new HashMap <>();
112+
113+ Allele _ref = null ;
114+ List <Allele > _alternates ;
115+ Map <Allele , Map <Allele , Allele >> _renamedAlleles ;
116+
117+ public SiteAndAlleles (String contig , int start )
118+ {
119+ _contig = contig ;
120+ _start = start ;
121+ }
122+
123+ public void addSite (VariantContext vc )
124+ {
125+ List <Allele > alleles = _encounteredAlleles .getOrDefault (vc .getReference (), new ArrayList <>());
126+ vc .getAlternateAlleles ().forEach (a -> {
127+ if (!alleles .contains (a ))
128+ {
129+ alleles .add (a );
130+ }
131+ });
132+ _encounteredAlleles .put (vc .getReference (), alleles );
133+ }
134+
135+ public void doFinalize ()
136+ {
137+ if (_ref != null )
138+ {
139+ throw new IllegalArgumentException ("Has already been finalized!" );
140+ }
141+
142+ if (_encounteredAlleles .keySet ().size () == 1 )
143+ {
144+ _ref = _encounteredAlleles .keySet ().iterator ().next ();
145+ _alternates = new ArrayList <>(_encounteredAlleles .get (_ref ));
146+ _renamedAlleles = Collections .emptyMap ();
147+
148+ return ;
149+ }
150+
151+ Allele finalRef = null ;
152+ for (Allele ref : _encounteredAlleles .keySet ())
153+ {
154+ if (finalRef == null )
155+ {
156+ finalRef = ref ;
157+ }
158+ else
159+ {
160+ finalRef = determineReferenceAllele (finalRef , ref );
161+ }
162+ }
163+ _ref = finalRef ;
164+
165+ List <Allele > finalAlleles = new ArrayList <>();
166+ _renamedAlleles = new HashMap <>();
167+
168+ for (Allele ref : _encounteredAlleles .keySet ())
169+ {
170+ if (ref .equals (finalRef ))
171+ {
172+ finalAlleles .addAll (_encounteredAlleles .get (finalRef ));
173+ }
174+ else
175+ {
176+ Map <Allele , Allele > alleleMap = createAlleleMapping (_ref , ref , _encounteredAlleles .get (finalRef ));
177+ for (Allele a : _encounteredAlleles .get (ref ))
178+ {
179+ a = alleleMap .getOrDefault (a , a );
180+ if (!finalAlleles .contains (a ))
181+ {
182+ finalAlleles .add (a );
183+ }
184+ }
185+
186+ _renamedAlleles .put (ref , alleleMap );
187+ }
188+ }
189+ _alternates = finalAlleles ;
190+ }
191+
192+ public boolean isMergedRef ()
193+ {
194+ return _encounteredAlleles .size () > 1 ;
195+ }
196+ }
197+
106198 @ Override
107199 public void processFilesRemote (List <SequenceOutputFile > inputFiles , JobContext ctx ) throws UnsupportedOperationException , PipelineJobException
108200 {
@@ -113,7 +205,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
113205 final int minDepth = ctx .getParams ().optInt (MIN_COVERAGE , 0 );
114206
115207 ctx .getLogger ().info ("Building whitelist of sites and alleles" );
116- Map <String , Set < Allele > > siteToAllele = new HashMap <>();
208+ Map <String , SiteAndAlleles > siteToAllele = new HashMap <>();
117209 List <Pair <String , Integer >> whitelistSites = new ArrayList <>();
118210
119211 Set <Integer > genomeIds = new HashSet <>();
@@ -144,13 +236,15 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
144236 double af = vc .getAttributeAsDouble ("AF" , 0.0 );
145237 if (af >= minAfThreshold )
146238 {
147- Set <Allele > alleles = siteToAllele .getOrDefault (getCacheKey (vc .getContig (), vc .getStart ()), new LinkedHashSet <>());
148- alleles .addAll (vc .getAlleles ());
149- if (!siteToAllele .containsKey (getCacheKey (vc .getContig (), vc .getStart ())))
239+ String key = getCacheKey (vc .getContig (), vc .getStart ());
240+ SiteAndAlleles site = siteToAllele .containsKey (key ) ? siteToAllele .get (key ) : new SiteAndAlleles (vc .getContig (), vc .getStart ());
241+ site .addSite (vc );
242+
243+ if (!siteToAllele .containsKey (key ))
150244 {
151245 whitelistSites .add (Pair .of (vc .getContig (), vc .getStart ()));
152246 }
153- siteToAllele .put (getCacheKey ( vc . getContig (), vc . getStart ()), alleles );
247+ siteToAllele .put (key , site );
154248 }
155249 }
156250 }
@@ -159,6 +253,10 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
159253 ctx .getLogger ().info ("total sites: " + whitelistSites .size ());
160254 Collections .sort (whitelistSites );
161255
256+ siteToAllele .forEach ((x , y ) -> {
257+ y .doFinalize ();
258+ });
259+
162260 ReferenceGenome genome = ctx .getSequenceSupport ().getCachedGenome (genomeIds .iterator ().next ());
163261 SAMSequenceDictionary dict = SAMSequenceDictionaryExtractor .extractDictionary (genome .getSequenceDictionary ().toPath ());
164262 Map <String , Integer > contigToOffset = getContigToOffset (dict );
@@ -169,7 +267,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
169267 int idx = 0 ;
170268 try (CSVWriter writer = new CSVWriter (IOUtil .openFileForBufferedUtf8Writing (output ), '\t' , CSVWriter .NO_QUOTE_CHARACTER ))
171269 {
172- writer .writeNext (new String []{"OutputFileId" , "ReadsetId" , "Contig" , "Start" , "Ref" , "AltAlleles" , "Depth" , "RefAF" , "AltAFs" });
270+ writer .writeNext (new String []{"OutputFileId" , "ReadsetId" , "Contig" , "Start" , "End" , " Ref" , "AltAlleles" , "OrigRef" , "OrigAlts " , "Depth" , "RefAF" , "AltAFs" });
173271
174272 for (Pair <String , Integer > site : whitelistSites )
175273 {
@@ -184,33 +282,35 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
184282 line .add (String .valueOf (so .getRowid ()));
185283 line .add (String .valueOf (so .getReadset ()));
186284
187- line .add (site .getLeft ());
188- line .add (String .valueOf (site .getRight ()));
189285 String key = getCacheKey (site .getLeft (), site .getRight ());
286+ SiteAndAlleles siteDef = siteToAllele .get (key );
190287
191- Allele refAllele = siteToAllele .get (key ).iterator ().next ();
192- line .add (refAllele .getBaseString ());
193-
194- Set <Allele > alleles = siteToAllele .get (key );
288+ line .add (site .getLeft ());
289+ line .add (String .valueOf (site .getRight ()));
290+ line .add (String .valueOf (site .getRight () + siteDef ._ref .length () - 1 ));
195291
196- //Drop the ref (first) element.
197- line .add (alleles .stream ().map (Allele ::getBaseString ).skip (1 ).collect (Collectors .joining (";" )));
292+ line . add ( siteDef . _ref . getBaseString ());
293+ line .add (siteDef . _alternates .stream ().map (Allele ::getBaseString ).skip (1 ).collect (Collectors .joining (";" )));
198294
199295 if (!it .hasNext ())
200296 {
201297 //No variant was called, so this is either considered all WT, or no-call
202298 int depth = getReadDepth (so .getFile (), contigToOffset , site .getLeft (), site .getRight ());
203299 if (depth < minDepth )
204300 {
301+ line .add ("" );
302+ line .add ("" );
205303 line .add (String .valueOf (depth ));
206304 line .add ("ND" );
207305 line .add ("ND" );
208306 }
209307 else
210308 {
309+ line .add ("" );
310+ line .add ("" );
211311 line .add (String .valueOf (depth ));
212312 line .add ("1" );
213- line .add (";0" .repeat (alleles .size () - 1 ).substring (1 ));
313+ line .add (";0" .repeat (siteDef . _alternates .size () - 1 ).substring (1 ));
214314 }
215315
216316 writer .writeNext (line .toArray (new String []{}));
@@ -221,9 +321,19 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
221321 Integer depth = null ;
222322 Double totalAltAf = 0.0 ;
223323 Map <Allele , Double > alleleToAf = new HashMap <>();
324+ Set <Allele > refs = new HashSet <>();
325+
224326 while (it .hasNext ())
225327 {
226328 VariantContext vc = it .next ();
329+ if (vc .getStart () != siteDef ._start )
330+ {
331+ throw new PipelineJobException ("Iterating incorrect start: " + siteDef ._start + " / " + vc .getStart () + " / " + so .getFile ().getPath ());
332+ }
333+
334+ refs .add (vc .getReference ());
335+ Map <Allele , Allele > alleleRenameMap = siteDef ._renamedAlleles .getOrDefault (vc .getReference (), Collections .emptyMap ());
336+
227337 if (vc .getAttribute ("GATK_DP" ) == null )
228338 {
229339 throw new PipelineJobException ("Expected GATK_DP annotation on line " + key + " in file: " + so .getFile ().getPath ());
@@ -252,24 +362,46 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
252362 totalAltAf += afs .stream ().reduce (0.0 , Double ::sum );
253363 vc .getAlternateAlleles ().forEach (a -> {
254364 int aIdx = vc .getAlternateAlleles ().indexOf (a );
365+
366+ //If the alleles were renamed due to combining references of differing length, account for this:
367+ a = alleleRenameMap .getOrDefault (a , a );
255368 alleleToAf .put (a , afs .get (aIdx ));
256369 });
257370 }
258371 }
259372
260373 List <String > toWrite = new ArrayList <>(line );
374+ if (!siteDef .isMergedRef ())
375+ {
376+ toWrite .add ("" );
377+ toWrite .add ("" );
378+ }
379+ else
380+ {
381+ refs .remove (siteDef ._ref );
382+ if (refs .isEmpty ())
383+ {
384+ toWrite .add ("" );
385+ toWrite .add ("" );
386+ }
387+ else
388+ {
389+ toWrite .add (refs .stream ().map (Allele ::getBaseString ).collect (Collectors .joining (";" )));
390+ List <String > alleleSets = new ArrayList <>();
391+ refs .forEach (r -> {
392+ alleleSets .add (siteDef ._encounteredAlleles .get (r ).stream ().map (Allele ::getBaseString ).collect (Collectors .joining ("," )));
393+ });
394+ toWrite .add (alleleSets .stream ().collect (Collectors .joining (";" )));
395+ }
396+ }
397+
261398 toWrite .add (String .valueOf (depth ));
262399 toWrite .add (String .valueOf (1 - totalAltAf ));
263400
264401 //Add AFs in order:
265402 List <Object > toAdd = new ArrayList <>();
266- for (Allele a : alleles )
403+ for (Allele a : siteDef . _alternates )
267404 {
268- if (a .isReference ())
269- {
270- continue ;
271- }
272-
273405 double af = alleleToAf .getOrDefault (a , 0.0 );
274406 toAdd .add (af == NO_DATA_VAL ? "ND" : af );
275407 }
@@ -365,4 +497,55 @@ private VCFFileReader getReader(File f)
365497 return readerMap .get (f );
366498 }
367499 }
500+
501+ //Adapted from GATK's GATKVariantContextUtils
502+ private static Allele determineReferenceAllele (final Allele ref1 , final Allele ref2 )
503+ {
504+ if ( ref1 == null || ref1 .length () < ref2 .length () )
505+ {
506+ return ref2 ;
507+ }
508+ else if ( ref2 == null || ref2 .length () < ref1 .length ())
509+ {
510+ return ref1 ;
511+ }
512+ else if ( ref1 .length () == ref2 .length () && ! ref1 .equals (ref2 ) )
513+ {
514+ throw new IllegalArgumentException (String .format ("The provided reference alleles do not appear to represent the same position, %s vs. %s" , ref1 , ref2 ));
515+ }
516+ else
517+ {
518+ return ref1 ;
519+ }
520+ }
521+
522+ private static Map <Allele , Allele > createAlleleMapping (final Allele refAllele , final Allele inputRef , final List <Allele > inputAlts )
523+ {
524+ if (refAllele .length () > inputRef .length ())
525+ {
526+ throw new IllegalArgumentException ("BUG: inputRef=" + inputRef + " is longer than refAllele=" + refAllele );
527+ }
528+
529+ final byte [] extraBases = Arrays .copyOfRange (refAllele .getBases (), inputRef .length (), refAllele .length ());
530+
531+ final Map <Allele , Allele > map = new LinkedHashMap <>();
532+ for ( final Allele a : inputAlts )
533+ {
534+ if ( isNonSymbolicExtendableAllele (a ) )
535+ {
536+ Allele extended = Allele .extend (a , extraBases );
537+ map .put (a , extended );
538+ } else if (a .equals (Allele .SPAN_DEL ))
539+ {
540+ map .put (a , a );
541+ }
542+ }
543+
544+ return map ;
545+ }
546+
547+ private static boolean isNonSymbolicExtendableAllele (final Allele allele )
548+ {
549+ return ! (allele .isReference () || allele .isSymbolic () || allele .equals (Allele .SPAN_DEL ));
550+ }
368551}
0 commit comments