Skip to content

Commit dbc01a9

Browse files
committed
Allow VariantProcessingStep to supply custom merge code for scatter/gather
1 parent 5c19405 commit dbc01a9

File tree

4 files changed

+39
-15
lines changed

4 files changed

+39
-15
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ default void validateScatter(ScatterGatherMethod method, PipelineJob job) throws
6262
{
6363

6464
}
65+
66+
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
67+
{
68+
69+
}
6570
}
6671

6772
public static interface MayRequirePrepareTask

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
3636
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
3737
import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService;
38+
import org.labkey.api.sequenceanalysis.pipeline.TaskFileManager;
3839
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
3940
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
4041
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
@@ -851,4 +852,18 @@ else if (AbstractGenomicsDBImportHandler.TILE_DB_FILETYPE.isType(input))
851852
throw new PipelineJobException("Unknown file type: " + input.getPath());
852853
}
853854
}
855+
856+
@Override
857+
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
858+
{
859+
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(job, VariantProcessingStep.class);
860+
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
861+
{
862+
VariantProcessingStep step = stepCtx.getProvider().create(ctx);
863+
if (step instanceof VariantProcessingStep.SupportsScatterGather)
864+
{
865+
((VariantProcessingStep.SupportsScatterGather)step).performAdditionalMergeTasks(ctx, job, manager, genome, orderedScatterOutputs);
866+
}
867+
}
868+
}
854869
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import htsjdk.samtools.util.Interval;
44
import org.apache.commons.lang3.StringUtils;
5-
import org.apache.logging.log4j.Logger;
65
import org.jetbrains.annotations.NotNull;
76
import org.labkey.api.pipeline.AbstractTaskFactory;
87
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
@@ -15,14 +14,12 @@
1514
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
1615
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
1716
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
18-
import org.labkey.api.sequenceanalysis.run.AbstractDiscvrSeqWrapper;
17+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
1918
import org.labkey.api.util.FileType;
20-
import org.labkey.api.writer.PrintWriters;
2119
import org.labkey.sequenceanalysis.run.variant.OutputVariantsStartingInIntervalsStep;
2220

2321
import java.io.File;
2422
import java.io.IOException;
25-
import java.io.PrintWriter;
2623
import java.util.ArrayList;
2724
import java.util.Collections;
2825
import java.util.HashSet;
@@ -113,6 +110,7 @@ private VariantProcessingJob getPipelineJob()
113110
SequenceTaskHelper.logModuleVersions(getJob().getLogger());
114111
RecordedAction action = new RecordedAction(ACTION_NAME);
115112
TaskFileManagerImpl manager = new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd);
113+
JobContextImpl ctx = new JobContextImpl(getPipelineJob(), getPipelineJob().getSequenceSupport(), getPipelineJob().getParameterJson(), _wd.getDir(), new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd), _wd);
116114

117115
File finalOut;
118116
SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler = getPipelineJob().getHandler();
@@ -122,7 +120,7 @@ private VariantProcessingJob getPipelineJob()
122120
}
123121
else
124122
{
125-
finalOut = runDefaultVariantMerge(manager, action, handler);
123+
finalOut = runDefaultVariantMerge(ctx, manager, action, handler);
126124
}
127125

128126
Map<String, File> scatterOutputs = getPipelineJob().getScatterJobOutputs();
@@ -154,7 +152,7 @@ private VariantProcessingJob getPipelineJob()
154152
return new RecordedActionSet(action);
155153
}
156154

157-
private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
155+
private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
158156
{
159157
Map<String, List<Interval>> jobToIntervalMap = getPipelineJob().getJobToIntervalMap();
160158
getJob().setStatus(PipelineJob.TaskStatus.running, "Combining Per-Contig VCFs: " + jobToIntervalMap.size());
@@ -209,6 +207,15 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
209207
manager.addIntermediateFile(new File(vcf.getPath() + ".tbi"));
210208
}
211209

210+
Set<Integer> genomeIds = new HashSet<>();
211+
getPipelineJob().getFiles().forEach(x -> genomeIds.add(x.getLibrary_id()));
212+
if (genomeIds.size() != 1)
213+
{
214+
throw new PipelineJobException("Expected a single genome, found: " + StringUtils.join(genomeIds, ", "));
215+
}
216+
217+
ReferenceGenome genome = getPipelineJob().getSequenceSupport().getCachedGenome(genomeIds.iterator().next());
218+
212219
String basename = SequenceAnalysisService.get().getUnzippedBaseName(toConcat.get(0).getName());
213220
File combined = new File(getPipelineJob().getAnalysisDirectory(), basename + ".vcf.gz");
214221
File combinedIdx = new File(combined.getPath() + ".tbi");
@@ -223,18 +230,15 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
223230
throw new PipelineJobException("Missing one of more VCFs: " + missing.stream().map(File::getPath).collect(Collectors.joining(",")));
224231
}
225232

226-
Set<Integer> genomeIds = new HashSet<>();
227-
getPipelineJob().getFiles().forEach(x -> genomeIds.add(x.getLibrary_id()));
228-
if (genomeIds.size() != 1)
229-
{
230-
throw new PipelineJobException("Expected a single genome, found: " + StringUtils.join(genomeIds, ", "));
231-
}
232-
233-
ReferenceGenome genome = getPipelineJob().getSequenceSupport().getCachedGenome(genomeIds.iterator().next());
234233
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, getJob().getLogger(), true, null);
235234
}
236235
manager.addOutput(action, "Merged VCF", combined);
237236

237+
if (handler instanceof VariantProcessingStep.SupportsScatterGather)
238+
{
239+
((VariantProcessingStep.SupportsScatterGather) handler).performAdditionalMergeTasks(ctx, getPipelineJob(), manager, genome, toConcat);
240+
}
241+
238242
return combined;
239243
}
240244
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/SplitVcfBySamplesStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno
7373
if (!f.getName().equals(inputVCF.getName()) && f.getName().startsWith(basename) && SequenceUtil.FILETYPE.vcf.getFileType().isType(f))
7474
{
7575
output.addOutput(f, "Subset VCF");
76-
output.addSequenceOutput(f, "Subset VCF: " + f.getName(), "VCF", null, null, genome.getGenomeId(), null);
76+
output.addSequenceOutput(f, "Subset VCF: " + f.getName(), "VCF File", null, null, genome.getGenomeId(), null);
7777
}
7878
}
7979

0 commit comments

Comments
 (0)