Skip to content

Commit 84087f2

Browse files
committed
Add handler to merge LoFreq VCFs, accounting for depth
1 parent e12d131 commit 84087f2

File tree

2 files changed

+361
-0
lines changed

2 files changed

+361
-0
lines changed

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.labkey.sequenceanalysis.run.analysis.HaplotypeCallerAnalysis;
7979
import org.labkey.sequenceanalysis.run.analysis.ImmunoGenotypingAnalysis;
8080
import org.labkey.sequenceanalysis.run.analysis.LofreqAnalysis;
81+
import org.labkey.sequenceanalysis.run.analysis.MergeLoFreqVcfHandler;
8182
import org.labkey.sequenceanalysis.run.analysis.PARalyzerAnalysis;
8283
import org.labkey.sequenceanalysis.run.analysis.SequenceBasedTypingAnalysis;
8384
import org.labkey.sequenceanalysis.run.analysis.SnpCountAnalysis;
@@ -328,6 +329,7 @@ public static void registerPipelineSteps()
328329
SequenceAnalysisService.get().registerFileHandler(new MultiQCBamHandler());
329330
SequenceAnalysisService.get().registerFileHandler(new GenomicsDBImportHandler());
330331
SequenceAnalysisService.get().registerFileHandler(new GenomicsDBAppendHandler());
332+
SequenceAnalysisService.get().registerFileHandler(new MergeLoFreqVcfHandler());
331333

332334
SequenceAnalysisService.get().registerReadsetHandler(new MultiQCHandler());
333335
SequenceAnalysisService.get().registerReadsetHandler(new CellHashingHandler());
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
package org.labkey.sequenceanalysis.run.analysis;
2+
3+
import au.com.bytecode.opencsv.CSVWriter;
4+
import htsjdk.samtools.SAMSequenceDictionary;
5+
import htsjdk.samtools.SAMSequenceRecord;
6+
import htsjdk.samtools.util.CloseableIterator;
7+
import htsjdk.samtools.util.IOUtil;
8+
import htsjdk.variant.utils.SAMSequenceDictionaryExtractor;
9+
import htsjdk.variant.variantcontext.Allele;
10+
import htsjdk.variant.variantcontext.VariantContext;
11+
import htsjdk.variant.vcf.VCFFileReader;
12+
import htsjdk.variant.vcf.VCFHeader;
13+
import org.apache.commons.lang3.StringUtils;
14+
import org.apache.commons.lang3.tuple.Pair;
15+
import org.json.JSONObject;
16+
import org.labkey.api.module.ModuleLoader;
17+
import org.labkey.api.pipeline.PipelineJob;
18+
import org.labkey.api.pipeline.PipelineJobException;
19+
import org.labkey.api.pipeline.RecordedAction;
20+
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
21+
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
22+
import org.labkey.api.sequenceanalysis.pipeline.AbstractParameterizedOutputHandler;
23+
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
24+
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
25+
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
26+
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
27+
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
28+
import org.labkey.sequenceanalysis.util.SequenceUtil;
29+
30+
import java.io.File;
31+
import java.io.IOException;
32+
import java.nio.file.Files;
33+
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.HashSet;
38+
import java.util.LinkedHashSet;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.Set;
42+
import java.util.stream.Collectors;
43+
import java.util.stream.Stream;
44+
45+
public class MergeLoFreqVcfHandler extends AbstractParameterizedOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>
46+
{
47+
private static final String MIN_AF = "minAfThreshold";
48+
private static final String MIN_COVERAGE = "minCoverage";
49+
50+
public MergeLoFreqVcfHandler()
51+
{
52+
super(ModuleLoader.getInstance().getModule(SequenceAnalysisModule.class), "Merge LoFreq VCFs", "This will merge VCFs generate by LoFreq, considering the DepthOfCoverage data generated during those jobs.", null, Arrays.asList(
53+
ToolParameterDescriptor.create("basename", "Output Name", "The basename of the output file.", "textfield", new JSONObject(){{
54+
put("allowBlank", false);
55+
}}, null),
56+
ToolParameterDescriptor.create(MIN_AF, "Min AF to Include", "A site will be included if there is a passing variant above this AF in at least one sample.", "ldk-numberfield", new JSONObject(){{
57+
put("minValue", 0);
58+
put("maxValue", 1);
59+
put("decimalPrecision", 2);
60+
}}, 0.01),
61+
ToolParameterDescriptor.create(MIN_COVERAGE, "Min Coverage To Include", "A site will be reported as ND, unless coverage is above this threshold", "ldk-integerfield", new JSONObject(){{
62+
put("minValue", 0);
63+
}}, 10)
64+
)
65+
);
66+
}
67+
68+
@Override
69+
public boolean canProcess(SequenceOutputFile o)
70+
{
71+
return LofreqAnalysis.CATEGORY.equals(o.getCategory()) && o.getFile() != null && SequenceUtil.FILETYPE.vcf.getFileType().isType(o.getFile());
72+
}
73+
74+
@Override
75+
public boolean doRunRemote()
76+
{
77+
return true;
78+
}
79+
80+
@Override
81+
public boolean doRunLocal()
82+
{
83+
return false;
84+
}
85+
86+
@Override
87+
public SequenceOutputProcessor getProcessor()
88+
{
89+
return new Processor();
90+
}
91+
92+
public static class Processor implements SequenceOutputHandler.SequenceOutputProcessor
93+
{
94+
@Override
95+
public void init(PipelineJob job, SequenceAnalysisJobSupport support, List<SequenceOutputFile> inputFiles, JSONObject params, File outputDir, List<RecordedAction> actions, List<SequenceOutputFile> outputsToCreate) throws UnsupportedOperationException, PipelineJobException
96+
{
97+
98+
}
99+
100+
@Override
101+
public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport support, List<SequenceOutputFile> inputFiles, JSONObject params, File outputDir, List<RecordedAction> actions, List<SequenceOutputFile> outputsToCreate) throws UnsupportedOperationException, PipelineJobException
102+
{
103+
104+
}
105+
106+
@Override
107+
public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext ctx) throws UnsupportedOperationException, PipelineJobException
108+
{
109+
String basename = ctx.getParams().getString("basename");
110+
basename = basename + (basename.endsWith(".") ? "" : ".");
111+
112+
final double minAfThreshold = ctx.getParams().optDouble(MIN_AF, 0.0);
113+
final int minDepth = ctx.getParams().optInt(MIN_COVERAGE, 0);
114+
115+
ctx.getLogger().info("Building whitelist of sites and alleles");
116+
Map<String, Set<Allele>> siteToAllele = new HashMap<>();
117+
List<Pair<String, Integer>> whitelistSites = new ArrayList<>();
118+
119+
Set<Integer> genomeIds = new HashSet<>();
120+
121+
for (SequenceOutputFile so : inputFiles)
122+
{
123+
if (so.getLibrary_id() == null)
124+
{
125+
throw new PipelineJobException("VCF lacks library id: " + so.getRowid());
126+
}
127+
128+
genomeIds.add(so.getLibrary_id());
129+
if (genomeIds.size() > 1)
130+
{
131+
throw new PipelineJobException("Samples use more than one genome. Genome IDs: " + StringUtils.join(genomeIds, ","));
132+
}
133+
134+
try (VCFFileReader reader = new VCFFileReader(so.getFile()); CloseableIterator<VariantContext> it = reader.iterator())
135+
{
136+
while (it.hasNext())
137+
{
138+
VariantContext vc = it.next();
139+
if (vc.getAttribute("AF") == null)
140+
{
141+
continue;
142+
}
143+
144+
double af = vc.getAttributeAsDouble("AF", 0.0);
145+
if (af >= minAfThreshold)
146+
{
147+
Set<Allele> alleles = siteToAllele.getOrDefault(getCacheKey(vc.getContig(), vc.getStart()), new LinkedHashSet<>());
148+
alleles.addAll(vc.getAlleles());
149+
siteToAllele.put(getCacheKey(vc.getContig(), vc.getStart()), alleles);
150+
whitelistSites.add(Pair.of(vc.getContig(), vc.getStart()));
151+
}
152+
}
153+
}
154+
}
155+
156+
ctx.getLogger().info("total sites: " + whitelistSites.size());
157+
Collections.sort(whitelistSites);
158+
159+
ReferenceGenome genome = ctx.getSequenceSupport().getCachedGenome(genomeIds.iterator().next());
160+
SAMSequenceDictionary dict = SAMSequenceDictionaryExtractor.extractDictionary(genome.getSequenceDictionary().toPath());
161+
Map<String, Integer> contigToOffset = getContigToOffset(dict);
162+
163+
ctx.getLogger().info("Building merged table");
164+
165+
File output = new File(ctx.getOutputDir(), basename + "txt");
166+
int idx = 0;
167+
try (CSVWriter writer = new CSVWriter(IOUtil.openFileForBufferedUtf8Writing(output)))
168+
{
169+
writer.writeNext(new String[]{"OutputFileId", "ReadsetId", "Contig", "Start", "Ref", "AltAlleles", "Depth", "RefAF", "AltAFs"});
170+
171+
for (Pair<String, Integer> site : whitelistSites)
172+
{
173+
for (SequenceOutputFile so : inputFiles)
174+
{
175+
VCFFileReader reader = getReader(so.getFile());
176+
try (CloseableIterator<VariantContext> it = reader.query(site.getLeft(), site.getRight(), site.getRight()))
177+
{
178+
List<String> line = new ArrayList<>();
179+
line.add(String.valueOf(so.getRowid()));
180+
line.add(String.valueOf(so.getReadset()));
181+
182+
line.add(site.getLeft());
183+
line.add(String.valueOf(site.getRight()));
184+
String key = getCacheKey(site.getLeft(), site.getRight());
185+
186+
Allele refAllele = siteToAllele.get(key).iterator().next();
187+
line.add(refAllele.getBaseString());
188+
189+
Set<Allele> alleles = siteToAllele.get(key);
190+
line.add(alleles.stream().map(Allele::getBaseString).collect(Collectors.joining(";")));
191+
192+
if (!it.hasNext())
193+
{
194+
//No variant was called, so this is either considered all WT, or no-call
195+
int depth = getReadDepth(so.getFile(), contigToOffset, site.getLeft(), site.getRight());
196+
if (depth < minDepth)
197+
{
198+
line.add(String.valueOf(depth));
199+
line.add("ND");
200+
line.add("ND");
201+
}
202+
else
203+
{
204+
line.add(String.valueOf(depth));
205+
line.add("1");
206+
line.add(",0".repeat(alleles.size() - 1).substring(1));
207+
}
208+
209+
writer.writeNext(line.toArray(new String[]{}));
210+
idx++;
211+
}
212+
else
213+
{
214+
List<String> toWrite = new ArrayList<>(line);
215+
216+
while (it.hasNext())
217+
{
218+
VariantContext vc = it.next();
219+
if (vc.getAttribute("GATK_DP") == null)
220+
{
221+
throw new PipelineJobException("Expected GATK_DP annotation on line " + key + " in file: " + so.getFile().getPath());
222+
}
223+
224+
if (vc.getAttribute("AF") == null)
225+
{
226+
throw new PipelineJobException("Expected AF annotation on line " + key + " in file: " + so.getFile().getPath());
227+
}
228+
229+
int depth = vc.getAttributeAsInt("GATK_DP", 0);
230+
toWrite.add(String.valueOf(depth));
231+
232+
if (depth < minDepth)
233+
{
234+
toWrite.add("ND");
235+
toWrite.add("ND");
236+
}
237+
else
238+
{
239+
List<Double> afs = vc.getAttributeAsDoubleList("AF", 0);
240+
if (afs.size() != vc.getAlternateAlleles().size())
241+
{
242+
throw new PipelineJobException("Expected AF annotation on line " + key + " in file: " + so.getFile().getPath());
243+
}
244+
245+
double refAf = 1 - afs.stream().reduce(0.0, Double::sum);
246+
toWrite.add(String.valueOf(refAf));
247+
248+
List<Double> toAdd = new ArrayList<>();
249+
for (Allele a : alleles)
250+
{
251+
if (a.isReference())
252+
{
253+
continue;
254+
}
255+
256+
int aIdx = vc.getAlternateAlleles().indexOf(a);
257+
if (aIdx == -1)
258+
{
259+
toAdd.add(0.0);
260+
}
261+
else
262+
{
263+
toAdd.add(afs.get(aIdx));
264+
}
265+
}
266+
267+
toWrite.add(toAdd.stream().map(String::valueOf).collect(Collectors.joining(";")));
268+
}
269+
270+
writer.writeNext(toWrite.toArray(new String[]{}));
271+
idx++;
272+
}
273+
}
274+
}
275+
276+
if (idx % 500 == 0)
277+
{
278+
ctx.getLogger().info("Total sites written: " + idx);
279+
}
280+
}
281+
}
282+
}
283+
catch (IOException e)
284+
{
285+
throw new PipelineJobException(e);
286+
}
287+
288+
for (VCFFileReader reader : readerMap.values())
289+
{
290+
try
291+
{
292+
reader.close();
293+
}
294+
catch (Throwable e)
295+
{
296+
ctx.getLogger().error("Unable to close reader: " + e.getMessage());
297+
}
298+
}
299+
}
300+
301+
private String getCacheKey(String contig, int start)
302+
{
303+
return contig + "<>" + start;
304+
}
305+
306+
private Map<String, Integer> getContigToOffset(SAMSequenceDictionary dict)
307+
{
308+
Map<String, Integer> ret = new HashMap<>();
309+
310+
//Account for the header line:
311+
int offset = 1;
312+
for (SAMSequenceRecord rec : dict.getSequences())
313+
{
314+
ret.put(rec.getSequenceName(), offset);
315+
offset += rec.getSequenceLength();
316+
}
317+
318+
return ret;
319+
}
320+
321+
private int getReadDepth(File vcf, Map<String, Integer> contigToOffset, String contig, int position) throws PipelineJobException
322+
{
323+
File gatkDepth = new File(vcf.getParentFile(), SequenceAnalysisService.get().getUnzippedBaseName(vcf.getName()) + ".coverage");
324+
if (!gatkDepth.exists())
325+
{
326+
throw new PipelineJobException("File not found: " + gatkDepth.getPath());
327+
}
328+
329+
try (Stream<String> lines = Files.lines(gatkDepth.toPath()))
330+
{
331+
int lineNo = contigToOffset.get(contig) + position;
332+
String[] line = lines.skip(lineNo).findFirst().get().split("\t");
333+
334+
if (!line[0].equals(contig + ":" + position))
335+
{
336+
throw new PipelineJobException("Incorrect line at " + lineNo + ", expected " + contig + ":" + position + ", but was: " + line[0]);
337+
}
338+
339+
return Integer.parseInt(line[1]);
340+
}
341+
catch (IOException e)
342+
{
343+
throw new PipelineJobException(e);
344+
}
345+
}
346+
347+
private Map<File, VCFFileReader> readerMap = new HashMap<>();
348+
349+
private VCFFileReader getReader(File f)
350+
{
351+
if (!readerMap.containsKey(f))
352+
{
353+
readerMap.put(f, new VCFFileReader(f));
354+
}
355+
356+
return readerMap.get(f);
357+
}
358+
}
359+
}

0 commit comments

Comments
 (0)