Skip to content

Commit c35ea75

Browse files
committed
Add more support for scatter/gather
1 parent f2e2735 commit c35ea75

File tree

4 files changed

+59
-9
lines changed

4 files changed

+59
-9
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ default void validateScatter(ScatterGatherMethod method, PipelineJob job) throws
6868

6969
}
7070

71-
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
71+
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
7272
{
7373

7474
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -872,14 +872,14 @@ else if (AbstractGenomicsDBImportHandler.TILE_DB_FILETYPE.isType(input))
872872
}
873873

874874
@Override
875-
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
875+
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
876876
{
877877
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(job, VariantProcessingStep.class);
878878
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
879879
{
880880
if (stepCtx.getProvider() instanceof VariantProcessingStep.SupportsScatterGather ssg)
881881
{
882-
ssg.performAdditionalMergeTasks(ctx, job, manager, genome, orderedScatterOutputs);
882+
ssg.performAdditionalMergeTasks(ctx, job, manager, genome, orderedScatterOutputs, orderedJobDirs);
883883
}
884884
}
885885
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl mana
238238
if (handler instanceof VariantProcessingStep.SupportsScatterGather)
239239
{
240240
ctx.getLogger().debug("Running additional merge tasks");
241-
((VariantProcessingStep.SupportsScatterGather) handler).performAdditionalMergeTasks(ctx, getPipelineJob(), manager, genome, toConcat);
241+
((VariantProcessingStep.SupportsScatterGather) handler).performAdditionalMergeTasks(ctx, getPipelineJob(), manager, genome, toConcat, new ArrayList<>(jobToIntervalMap.keySet()));
242242
}
243243

244244
return combined;

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/SplitVcfBySamplesStep.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,39 @@
33
import htsjdk.samtools.util.Interval;
44
import org.apache.logging.log4j.Logger;
55
import org.jetbrains.annotations.Nullable;
6+
import org.labkey.api.pipeline.PipelineJob;
67
import org.labkey.api.pipeline.PipelineJobException;
78
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
9+
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
810
import org.labkey.api.sequenceanalysis.pipeline.AbstractVariantProcessingStepProvider;
911
import org.labkey.api.sequenceanalysis.pipeline.CommandLineParam;
1012
import org.labkey.api.sequenceanalysis.pipeline.PipelineContext;
1113
import org.labkey.api.sequenceanalysis.pipeline.PipelineStep;
1214
import org.labkey.api.sequenceanalysis.pipeline.PipelineStepProvider;
1315
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
16+
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
17+
import org.labkey.api.sequenceanalysis.pipeline.TaskFileManager;
1418
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
1519
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
1620
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStepOutputImpl;
1721
import org.labkey.api.sequenceanalysis.run.AbstractCommandPipelineStep;
1822
import org.labkey.api.sequenceanalysis.run.AbstractDiscvrSeqWrapper;
23+
import org.labkey.sequenceanalysis.pipeline.SequenceJob;
1924
import org.labkey.sequenceanalysis.util.SequenceUtil;
2025

2126
import java.io.File;
2227
import java.util.ArrayList;
2328
import java.util.Arrays;
2429
import java.util.List;
2530

26-
public class SplitVcfBySamplesStep extends AbstractCommandPipelineStep<SplitVcfBySamplesStep.Wrapper> implements VariantProcessingStep
31+
public class SplitVcfBySamplesStep extends AbstractCommandPipelineStep<SplitVcfBySamplesStep.Wrapper> implements VariantProcessingStep, VariantProcessingStep.SupportsScatterGather
2732
{
2833
public SplitVcfBySamplesStep(PipelineStepProvider<?> provider, PipelineContext ctx)
2934
{
3035
super(provider, ctx, new Wrapper(ctx.getLogger()));
3136
}
3237

33-
public static class Provider extends AbstractVariantProcessingStepProvider<SelectSamplesStep>
38+
public static class Provider extends AbstractVariantProcessingStepProvider<SelectSamplesStep> implements SupportsScatterGather
3439
{
3540
public Provider()
3641
{
@@ -67,20 +72,65 @@ public Output processVariants(File inputVCF, File outputDirectory, ReferenceGeno
6772

6873
output.addInput(inputVCF, "Input VCF");
6974

75+
return output;
76+
}
77+
78+
private List<File> findProducedVcfs(File inputVCF, File outputDirectory)
79+
{
80+
List<File> ret = new ArrayList<>();
7081
String basename = SequenceAnalysisService.get().getUnzippedBaseName(inputVCF.getName());
7182
for (File f : outputDirectory.listFiles())
7283
{
7384
if (!f.getName().equals(inputVCF.getName()) && f.getName().startsWith(basename) && SequenceUtil.FILETYPE.vcf.getFileType().isType(f))
7485
{
75-
output.addOutput(f, "Subset VCF");
76-
output.addSequenceOutput(f, "Subset VCF: " + f.getName(), "VCF File", null, null, genome.getGenomeId(), null);
86+
ret.add(f);
7787
}
7888
}
7989

80-
return output;
90+
return ret;
8191
}
8292

93+
@Override
94+
public void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
95+
{
96+
job.getLogger().info("Merging additional track VCFs");
97+
File inputVCF = ((SequenceJob)getPipelineCtx().getJob()).getInputFiles().get(0);
98+
List<File> firstJobOutputs = findProducedVcfs(inputVCF, new File(ctx.getWorkingDirectory(), orderedJobDirs.get(0)));
99+
for (File fn : firstJobOutputs)
100+
{
101+
List<File> toConcat = orderedJobDirs.stream().map(jobDir -> {
102+
File f = new File(new File(getPipelineCtx().getWorkingDirectory(), jobDir), fn.getName());
103+
if (!f.exists())
104+
{
105+
throw new IllegalStateException("Missing file: " + f.getPath());
106+
}
107+
108+
ctx.getFileManager().addIntermediateFile(f);
109+
ctx.getFileManager().addIntermediateFile(new File(f.getPath() + ".tbi"));
83110

111+
return f;
112+
}).toList();
113+
114+
String basename = SequenceAnalysisService.get().getUnzippedBaseName(toConcat.get(0).getName());
115+
File combined = new File(ctx.getSourceDirectory(), basename + ".vcf.gz");
116+
File combinedIdx = new File(combined.getPath() + ".tbi");
117+
if (combinedIdx.exists())
118+
{
119+
job.getLogger().info("VCF exists, will not recreate: " + combined.getPath());
120+
}
121+
else
122+
{
123+
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, job.getLogger(), true, null);
124+
}
125+
126+
SequenceOutputFile so = new SequenceOutputFile();
127+
so.setName("Subset VCF: " + fn);
128+
so.setFile(combined);
129+
so.setCategory("VCF File");
130+
so.setLibrary_id(genome.getGenomeId());
131+
manager.addSequenceOutput(so);
132+
}
133+
}
84134

85135
public static class Wrapper extends AbstractDiscvrSeqWrapper
86136
{

0 commit comments

Comments
 (0)