Skip to content

Commit f95a013

Browse files
committed
Improve scatter/gather jobs when there is no primary output generated
1 parent c35ea75 commit f95a013

File tree

3 files changed

+47
-48
lines changed

3 files changed

+47
-48
lines changed

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
411411
resumer.getFileManager().addIntermediateFile(outputFileIdx);
412412
}
413413

414+
File effectiveInput = currentVCF; //this will be tested at the end to determine if a new file was actually created
414415
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
415416
{
416417
ctx.getLogger().info("Starting to run: " + stepCtx.getProvider().getLabel());
@@ -484,7 +485,7 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
484485
resumer.setStepComplete(stepIdx, input.getPath(), action, currentVCF);
485486
}
486487

487-
if (currentVCF.exists())
488+
if (currentVCF != null && currentVCF.exists() && !currentVCF.equals(effectiveInput))
488489
{
489490
resumer.getFileManager().removeIntermediateFile(currentVCF);
490491
resumer.getFileManager().removeIntermediateFile(new File(currentVCF.getPath() + ".tbi"));

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import htsjdk.samtools.util.Interval;
44
import org.apache.commons.lang3.StringUtils;
55
import org.jetbrains.annotations.NotNull;
6+
import org.jetbrains.annotations.Nullable;
67
import org.labkey.api.pipeline.AbstractTaskFactory;
78
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
89
import org.labkey.api.pipeline.PipelineJob;
@@ -127,13 +128,16 @@ private VariantProcessingJob getPipelineJob()
127128
if (handler instanceof SequenceOutputHandler.TracksVCF)
128129
{
129130
Set<SequenceOutputFile> outputs = new HashSet<>();
130-
scatterOutputs.values().forEach(f -> outputs.addAll(getPipelineJob().getOutputsToCreate().stream().filter(x -> f.equals(x.getFile())).collect(Collectors.toSet())));
131+
scatterOutputs.values().forEach(f -> outputs.addAll(getPipelineJob().getOutputsToCreate().stream().filter(x -> x != null && f.equals(x.getFile())).collect(Collectors.toSet())));
131132
getJob().getLogger().debug("Total component outputs created: " + outputs.size());
132133
getPipelineJob().getOutputsToCreate().removeAll(outputs);
133134
getJob().getLogger().debug("Total SequenceOutputFiles on job after remove: " + getPipelineJob().getOutputsToCreate().size());
134135

135-
SequenceOutputFile finalOutput = ((SequenceOutputHandler.TracksVCF)getPipelineJob().getHandler()).createFinalSequenceOutput(getJob(), finalOut, getPipelineJob().getFiles());
136-
manager.addSequenceOutput(finalOutput);
136+
if (finalOut != null)
137+
{
138+
SequenceOutputFile finalOutput = ((SequenceOutputHandler.TracksVCF) getPipelineJob().getHandler()).createFinalSequenceOutput(getJob(), finalOut, getPipelineJob().getFiles());
139+
manager.addSequenceOutput(finalOutput);
140+
}
137141
}
138142
else
139143
{
@@ -152,14 +156,15 @@ private VariantProcessingJob getPipelineJob()
152156
return new RecordedActionSet(action);
153157
}
154158

155-
private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
159+
private @Nullable File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
156160
{
157161
Map<String, List<Interval>> jobToIntervalMap = getPipelineJob().getJobToIntervalMap();
158162
getJob().setStatus(PipelineJob.TaskStatus.running, "Combining Per-Contig VCFs: " + jobToIntervalMap.size());
159163

160164
Map<String, File> scatterOutputs = getPipelineJob().getScatterJobOutputs();
161165
List<File> toConcat = new ArrayList<>();
162166
Set<File> missing = new HashSet<>();
167+
int totalNull = 0;
163168
for (String name : jobToIntervalMap.keySet())
164169
{
165170
if (!scatterOutputs.containsKey(name))
@@ -168,45 +173,29 @@ private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl mana
168173
}
169174

170175
File vcf = scatterOutputs.get(name);
171-
if (!vcf.exists())
176+
if (scatterOutputs.get(name) == null)
172177
{
173-
missing.add(vcf);
174-
}
175-
176-
// NOTE: this was added to fix a one-time issue where -L was dropped from some upstream GenotypeGVCFs.
177-
// Under normal conditions this would never be necessary.
178-
boolean ensureOutputsWithinIntervals = getPipelineJob().getParameterJson().optBoolean("variantCalling.GenotypeGVCFs.ensureOutputsWithinIntervalsOnMerge", false);
179-
if (ensureOutputsWithinIntervals)
180-
{
181-
getJob().getLogger().debug("Ensuring ensure scatter outputs respect intervals");
182-
183-
File subsetVcf = new File(vcf.getParentFile(), SequenceAnalysisService.get().getUnzippedBaseName(vcf.getName()) + ".subset.vcf.gz");
184-
File subsetVcfIdx = new File(subsetVcf.getPath() + ".tbi");
185-
manager.addIntermediateFile(subsetVcf);
186-
manager.addIntermediateFile(subsetVcfIdx);
187-
188-
if (subsetVcfIdx.exists())
189-
{
190-
getJob().getLogger().debug("Index exists, will not re-subset the VCF: " + subsetVcf.getName());
191-
}
192-
else
193-
{
194-
OutputVariantsStartingInIntervalsStep.Wrapper wrapper = new OutputVariantsStartingInIntervalsStep.Wrapper(getJob().getLogger());
195-
wrapper.execute(vcf, subsetVcf, getPipelineJob().getIntervalsForTask());
196-
}
197-
198-
toConcat.add(subsetVcf);
178+
totalNull++;
179+
continue;
199180
}
200-
else
181+
else if (!vcf.exists())
201182
{
202-
toConcat.add(vcf);
183+
missing.add(vcf);
184+
continue;
203185
}
204186

187+
toConcat.add(vcf);
188+
205189
manager.addInput(action, "Input VCF", vcf);
206190
manager.addIntermediateFile(vcf);
207191
manager.addIntermediateFile(new File(vcf.getPath() + ".tbi"));
208192
}
209193

194+
if (totalNull > 0 && !toConcat.isEmpty())
195+
{
196+
throw new PipelineJobException("The scatter jobs returned a mixture of null and non-null outputs");
197+
}
198+
210199
Set<Integer> genomeIds = new HashSet<>();
211200
getPipelineJob().getFiles().forEach(x -> genomeIds.add(x.getLibrary_id()));
212201
if (genomeIds.size() != 1)
@@ -217,23 +206,27 @@ private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl mana
217206
ReferenceGenome genome = getPipelineJob().getSequenceSupport().getCachedGenome(genomeIds.iterator().next());
218207

219208
String basename = SequenceAnalysisService.get().getUnzippedBaseName(toConcat.get(0).getName());
220-
File combined = new File(getPipelineJob().getAnalysisDirectory(), basename + ".vcf.gz");
221-
File combinedIdx = new File(combined.getPath() + ".tbi");
222-
if (combinedIdx.exists())
209+
File combined = null;
210+
if (!toConcat.isEmpty())
223211
{
224-
getJob().getLogger().info("VCF exists, will not recreate: " + combined.getPath());
225-
}
226-
else
227-
{
228-
if (!missing.isEmpty())
212+
combined = new File(getPipelineJob().getAnalysisDirectory(), basename + ".vcf.gz");
213+
File combinedIdx = new File(combined.getPath() + ".tbi");
214+
if (combinedIdx.exists())
229215
{
230-
throw new PipelineJobException("Missing one of more VCFs: " + missing.stream().map(File::getPath).collect(Collectors.joining(",")));
216+
getJob().getLogger().info("VCF exists, will not recreate: " + combined.getPath());
231217
}
218+
else
219+
{
220+
if (!missing.isEmpty())
221+
{
222+
throw new PipelineJobException("Missing one of more VCFs: " + missing.stream().map(File::getPath).collect(Collectors.joining(",")));
223+
}
232224

233-
boolean sortAfterMerge = handler instanceof VariantProcessingStep.SupportsScatterGather && ((VariantProcessingStep.SupportsScatterGather)handler).doSortAfterMerge();
234-
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, getJob().getLogger(), true, null, sortAfterMerge);
225+
boolean sortAfterMerge = handler instanceof VariantProcessingStep.SupportsScatterGather && ((VariantProcessingStep.SupportsScatterGather) handler).doSortAfterMerge();
226+
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, getJob().getLogger(), true, null, sortAfterMerge);
227+
}
228+
manager.addOutput(action, "Merged VCF", combined);
235229
}
236-
manager.addOutput(action, "Merged VCF", combined);
237230

238231
if (handler instanceof VariantProcessingStep.SupportsScatterGather)
239232
{

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteSplitTask.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,13 @@ private VariantProcessingJob getPipelineJob()
117117
{
118118
output = ((SequenceOutputHandler.TracksVCF)handler).finalizeScatterJobOutput(ctx, output);
119119

120-
// If the output is still under the work dir, translate path. Otherwise it was already copied to the the source dir
121-
if (output.getPath().startsWith(_wd.getDir().getPath()))
120+
// If the output is still under the work dir, translate path. Otherwise it was already copied to the source dir
121+
if (output == null)
122+
{
123+
ctx.getLogger().debug("No output produced, adding null to scatter outputs");
124+
getPipelineJob().getScatterJobOutputs().put(getPipelineJob().getIntervalSetName(), null);
125+
}
126+
else if (output.getPath().startsWith(_wd.getDir().getPath()))
122127
{
123128
//NOTE: the VCF will be copied back to the source dir, so translate paths
124129
String path = _wd.getRelativePath(output);

0 commit comments

Comments
 (0)