Skip to content

Commit e52dea1

Browse files
committed
Better integration of OutputVariantsStartingInIntervals into scatter/gather
1 parent 9c2e268 commit e52dea1

File tree

5 files changed

+154
-50
lines changed

5 files changed

+154
-50
lines changed

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/GenotypeGVCFHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ else if (genomeIds.isEmpty())
266266
}
267267

268268
//run post processing, if needed
269-
File processed = ProcessVariantsHandler.processVCF(outputVcf, genomeId, ctx, resumer);
269+
File processed = ProcessVariantsHandler.processVCF(outputVcf, genomeId, ctx, resumer, false);
270270
if (processed == null)
271271
{
272272
ctx.getLogger().debug("adding GenotypeGVCFs output because no processing was selected");

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
4747
import org.labkey.sequenceanalysis.run.util.AbstractGenomicsDBImportHandler;
4848
import org.labkey.sequenceanalysis.run.util.MergeVcfsAndGenotypesWrapper;
49+
import org.labkey.sequenceanalysis.run.variant.OutputVariantsStartingInIntervalsStep;
4950
import org.labkey.sequenceanalysis.util.SequenceUtil;
5051

5152
import java.io.File;
@@ -359,7 +360,7 @@ public static List<Interval> getIntervals(JobContext ctx)
359360
return null;
360361
}
361362

362-
public static File processVCF(File input, Integer libraryId, JobContext ctx, Resumer resumer) throws PipelineJobException
363+
public static File processVCF(File input, Integer libraryId, JobContext ctx, Resumer resumer, boolean subsetToIntervals) throws PipelineJobException
363364
{
364365
try
365366
{
@@ -382,10 +383,37 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
382383
return null;
383384
}
384385

386+
boolean useScatterGather = getVariantPipelineJob(ctx.getJob()) != null && getVariantPipelineJob(ctx.getJob()).isScatterJob();
387+
if (useScatterGather && subsetToIntervals)
388+
{
389+
if (getIntervals(ctx) == null)
390+
{
391+
throw new PipelineJobException("Did not expect intervals to be null on a scatter/gather job");
392+
}
393+
394+
ctx.getLogger().info("Subsetting input VCF to job intervals");
395+
ctx.getJob().setStatus(PipelineJob.TaskStatus.running, "Subsetting input VCF to job intervals");
396+
397+
File outputFile = new File(ctx.getOutputDir(), SequenceAnalysisService.get().getUnzippedBaseName(currentVCF.getName()) + ".subset.vcf.gz");
398+
File outputFileIdx = new File(outputFile.getPath() + ".tbi");
399+
if (outputFileIdx.exists())
400+
{
401+
ctx.getLogger().debug("Index exists, will not re-subset VCF");
402+
}
403+
else
404+
{
405+
OutputVariantsStartingInIntervalsStep.Wrapper wrapper = new OutputVariantsStartingInIntervalsStep.Wrapper(ctx.getLogger());
406+
wrapper.execute(input, outputFile, getIntervals(ctx));
407+
}
408+
409+
currentVCF = outputFile;
410+
resumer.getFileManager().addIntermediateFile(currentVCF);
411+
resumer.getFileManager().addIntermediateFile(outputFileIdx);
412+
}
413+
385414
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
386415
{
387416
ctx.getLogger().info("Starting to run: " + stepCtx.getProvider().getLabel());
388-
389417
ctx.getJob().setStatus(PipelineJob.TaskStatus.running, "Running: " + stepCtx.getProvider().getLabel());
390418
stepIdx++;
391419

@@ -622,7 +650,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
622650

623651
private void processFile(File input, Integer libraryId, Integer readsetId, JobContext ctx) throws PipelineJobException
624652
{
625-
File processed = processVCF(input, libraryId, ctx, _resumer);
653+
File processed = processVCF(input, libraryId, ctx, _resumer, true);
626654
if (processed != null && processed.exists())
627655
{
628656
ctx.getLogger().debug("adding sequence output: " + processed.getPath());

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.labkey.api.sequenceanalysis.run.AbstractDiscvrSeqWrapper;
1919
import org.labkey.api.util.FileType;
2020
import org.labkey.api.writer.PrintWriters;
21+
import org.labkey.sequenceanalysis.run.variant.OutputVariantsStartingInIntervalsStep;
2122

2223
import java.io.File;
2324
import java.io.IOException;
@@ -180,12 +181,9 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
180181
if (ensureOutputsWithinIntervals)
181182
{
182183
getJob().getLogger().debug("Ensuring ensure scatter outputs respect intervals");
183-
List<Interval> expectedIntervals = jobToIntervalMap.get(name);
184184

185-
File intervalFile = new File(vcf.getParentFile(), "scatterIntervals.list");
186185
File subsetVcf = new File(vcf.getParentFile(), SequenceAnalysisService.get().getUnzippedBaseName(vcf.getName()) + ".subset.vcf.gz");
187186
File subsetVcfIdx = new File(subsetVcf.getPath() + ".tbi");
188-
manager.addIntermediateFile(intervalFile);
189187
manager.addIntermediateFile(subsetVcf);
190188
manager.addIntermediateFile(subsetVcfIdx);
191189

@@ -195,19 +193,8 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
195193
}
196194
else
197195
{
198-
try (PrintWriter writer = PrintWriters.getPrintWriter(intervalFile))
199-
{
200-
expectedIntervals.forEach(interval -> {
201-
writer.println(interval.getContig() + ":" + interval.getStart() + "-" + interval.getEnd());
202-
});
203-
}
204-
catch (IOException e)
205-
{
206-
throw new PipelineJobException(e);
207-
}
208-
209-
Wrapper wrapper = new Wrapper(getJob().getLogger());
210-
wrapper.execute(vcf, subsetVcf, intervalFile);
196+
OutputVariantsStartingInIntervalsStep.Wrapper wrapper = new OutputVariantsStartingInIntervalsStep.Wrapper(getJob().getLogger());
197+
wrapper.execute(vcf, subsetVcf, getPipelineJob().getIntervalsForTask());
211198
}
212199

213200
toConcat.add(subsetVcf);
@@ -250,33 +237,4 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
250237

251238
return combined;
252239
}
253-
254-
public static class Wrapper extends AbstractDiscvrSeqWrapper
255-
{
256-
public Wrapper(Logger log)
257-
{
258-
super(log);
259-
}
260-
261-
public void execute(File inputVcf, File outputVcf, File intervalFile) throws PipelineJobException
262-
{
263-
List<String> args = new ArrayList<>(getBaseArgs());
264-
args.add("OutputVariantsStartingInIntervals");
265-
266-
args.add("-V");
267-
args.add(inputVcf.getPath());
268-
269-
args.add("-O");
270-
args.add(outputVcf.getPath());
271-
272-
args.add("-L");
273-
args.add(intervalFile.getPath());
274-
275-
execute(args);
276-
if (!outputVcf.exists())
277-
{
278-
throw new PipelineJobException("Missing file: " + outputVcf.getPath());
279-
}
280-
}
281-
}
282240
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package org.labkey.sequenceanalysis.run.variant;
2+
3+
import htsjdk.samtools.util.Interval;
4+
import org.apache.logging.log4j.Logger;
5+
import org.labkey.api.pipeline.PipelineJobException;
6+
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
7+
import org.labkey.api.sequenceanalysis.pipeline.AbstractPipelineStep;
8+
import org.labkey.api.sequenceanalysis.pipeline.AbstractVariantProcessingStepProvider;
9+
import org.labkey.api.sequenceanalysis.pipeline.PipelineContext;
10+
import org.labkey.api.sequenceanalysis.pipeline.PipelineStepProvider;
11+
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
12+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
13+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStepOutputImpl;
14+
import org.labkey.api.sequenceanalysis.run.AbstractDiscvrSeqWrapper;
15+
import org.labkey.api.writer.PrintWriters;
16+
17+
import javax.annotation.Nullable;
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.io.PrintWriter;
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
25+
/**
26+
* User: bimber
27+
* Date: 6/15/2014
28+
* Time: 12:39 PM
29+
*/
30+
public class OutputVariantsStartingInIntervalsStep extends AbstractPipelineStep implements VariantProcessingStep
31+
{
32+
public OutputVariantsStartingInIntervalsStep(PipelineStepProvider<?> provider, PipelineContext ctx)
33+
{
34+
super(provider, ctx);
35+
}
36+
37+
public static class Provider extends AbstractVariantProcessingStepProvider<OutputVariantsStartingInIntervalsStep> implements RequiresPedigree
38+
{
39+
public Provider()
40+
{
41+
super("OutputVariantsStartingInIntervals", "Output Variants Starting In Intervals", "DISCVRseq", "This will subset the VCF to include only variants the start within the target intervals", Arrays.asList(
42+
43+
), null, "https://bimberlab.github.io/DISCVRSeq/");
44+
}
45+
46+
@Override
47+
public OutputVariantsStartingInIntervalsStep create(PipelineContext ctx)
48+
{
49+
return new OutputVariantsStartingInIntervalsStep(this, ctx);
50+
}
51+
}
52+
53+
@Override
54+
public Output processVariants(File inputVCF, File outputDirectory, ReferenceGenome genome, @Nullable List<Interval> intervals) throws PipelineJobException
55+
{
56+
VariantProcessingStepOutputImpl output = new VariantProcessingStepOutputImpl();
57+
if (intervals == null)
58+
{
59+
throw new PipelineJobException("This step requires intervals");
60+
}
61+
62+
File outputFile = new File(outputDirectory, SequenceAnalysisService.get().getUnzippedBaseName(inputVCF.getName()) + ".subset.vcf.gz");
63+
Wrapper wrapper = new Wrapper(getPipelineCtx().getLogger());
64+
wrapper.execute(inputVCF, outputFile, intervals);
65+
66+
output.addInput(inputVCF, "Input VCF");
67+
output.addInput(genome.getWorkingFastaFile(), "Reference Genome");
68+
output.addOutput(outputFile, "Subset VCF");
69+
70+
output.setVcf(outputFile);
71+
72+
return output;
73+
}
74+
75+
public static class Wrapper extends AbstractDiscvrSeqWrapper
76+
{
77+
public Wrapper(Logger log)
78+
{
79+
super(log);
80+
}
81+
82+
public void execute(File inputVcf, File outputVcf, List<Interval> intervals) throws PipelineJobException
83+
{
84+
File intervalFile = new File(outputVcf.getParentFile(), "scatterIntervals.list");
85+
try (PrintWriter writer = PrintWriters.getPrintWriter(intervalFile))
86+
{
87+
intervals.forEach(interval -> {
88+
writer.println(interval.getContig() + ":" + interval.getStart() + "-" + interval.getEnd());
89+
});
90+
}
91+
catch (IOException e)
92+
{
93+
throw new PipelineJobException(e);
94+
}
95+
96+
List<String> args = new ArrayList<>(getBaseArgs());
97+
args.add("OutputVariantsStartingInIntervals");
98+
99+
args.add("-V");
100+
args.add(inputVcf.getPath());
101+
102+
args.add("-O");
103+
args.add(outputVcf.getPath());
104+
105+
args.add("-L");
106+
args.add(intervalFile.getPath());
107+
108+
execute(args);
109+
if (!outputVcf.exists())
110+
{
111+
throw new PipelineJobException("Missing file: " + outputVcf.getPath());
112+
}
113+
114+
intervalFile.delete();
115+
}
116+
}
117+
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/SelectVariantsStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class SelectVariantsStep extends AbstractCommandPipelineStep<SelectVarian
3939
{
4040
public static String INTERVALS = "intervals";
4141

42-
public SelectVariantsStep(PipelineStepProvider provider, PipelineContext ctx)
42+
public SelectVariantsStep(PipelineStepProvider<?> provider, PipelineContext ctx)
4343
{
4444
super(provider, ctx, new SelectVariantsWrapper(ctx.getLogger()));
4545
}
@@ -77,6 +77,7 @@ public Provider()
7777
), Arrays.asList("sequenceanalysis/panel/VariantFilterPanel.js", "sequenceanalysis/panel/IntervalPanel.js", "/sequenceanalysis/field/TrimmingTextArea.js"), "");
7878
}
7979

80+
@Override
8081
public SelectVariantsStep create(PipelineContext ctx)
8182
{
8283
return new SelectVariantsStep(this, ctx);

0 commit comments

Comments
 (0)