Skip to content

Commit 121a5f8

Browse files
committed
Simplify merge VCF code
1 parent 9224d02 commit 121a5f8

File tree

5 files changed

+18
-40
lines changed

5 files changed

+18
-40
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ default void validateScatter(ScatterGatherMethod method, PipelineJob job) throws
9797

9898
}
9999

100-
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
100+
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
101101
{
102102
ctx.getLogger().debug("No additional merge tasks are implemented for: " + getClass().getName());
103103
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/OrphanFilePipelineJob.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -200,27 +200,8 @@ public boolean isJobComplete(PipelineJob job)
200200

201201
if (!orphanJobs.isEmpty())
202202
{
203-
getJob().getLogger().info("## The following sequence jobs are not referenced by readsets, analyses or output files.");
203+
getJob().getLogger().info("## There are {} sequence jobs are not referenced by readsets, analyses or output files.", orphanJobs.size());
204204
getJob().getLogger().info("## The best action would be to view the pipeline job list, 'Sequence Jobs' view, and filter for jobs without sequence outputs. Deleting any unwanted jobs through the UI should also delete files.");
205-
for (PipelineStatusFile sf : orphanJobs)
206-
{
207-
File f = new File(sf.getFilePath()).getParentFile();
208-
if (f.exists())
209-
{
210-
long size = FileUtils.sizeOfDirectory(f);
211-
//ignore if less than 1mb
212-
if (size > 1e6)
213-
{
214-
getJob().getLogger().info("\n## size: " + FileUtils.byteCountToDisplaySize(size));
215-
getJob().getLogger().info("\n" + f.getPath());
216-
}
217-
}
218-
else
219-
{
220-
messages.add("## Pipeline job folder does not exist: " + sf.getRowId());
221-
messages.add(f.getPath());
222-
}
223-
}
224205
}
225206

226207
if (!messages.isEmpty())
@@ -388,8 +369,6 @@ public void getOrphanFilesForContainer(Container c, User u, Set<File> orphanFile
388369
{
389370
if (!knownSequenceJobPaths.contains(subdir))
390371
{
391-
messages.add("#pipeline path listed as orphan, and not present in known job paths: ");
392-
messages.add(subdir.getPath());
393372
probableDeletes.add(subdir);
394373
unexpectedPipelineDirs.add(subdir);
395374
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -898,15 +898,15 @@ else if (AbstractGenomicsDBImportHandler.TILE_DB_FILETYPE.isType(input))
898898
}
899899

900900
@Override
901-
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
901+
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
902902
{
903903
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(job, VariantProcessingStep.class);
904904
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
905905
{
906906
VariantProcessingStep vps = stepCtx.getProvider().create(ctx);
907907
if (vps instanceof VariantProcessingStep.SupportsScatterGather ssg)
908908
{
909-
ssg.performAdditionalMergeTasks(ctx, job, manager, genome, orderedScatterOutputs, orderedJobDirs);
909+
ssg.performAdditionalMergeTasks(ctx, job, genome, orderedScatterOutputs, orderedJobDirs);
910910
}
911911
}
912912
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,17 @@ private VariantProcessingJob getPipelineJob()
110110
{
111111
SequenceTaskHelper.logModuleVersions(getJob().getLogger());
112112
RecordedAction action = new RecordedAction(ACTION_NAME);
113-
TaskFileManagerImpl manager = new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd);
114113
JobContextImpl ctx = new JobContextImpl(getPipelineJob(), getPipelineJob().getSequenceSupport(), getPipelineJob().getParameterJson(), _wd.getDir(), new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd), _wd);
115114

116115
File finalOut;
117116
SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler = getPipelineJob().getHandler();
118117
if (handler instanceof SequenceOutputHandler.HasCustomVariantMerge)
119118
{
120-
finalOut = ((SequenceOutputHandler.HasCustomVariantMerge)handler).performVariantMerge(manager, action, handler, getJob());
119+
finalOut = ((SequenceOutputHandler.HasCustomVariantMerge)handler).performVariantMerge(ctx.getFileManager(), action, handler, getJob());
121120
}
122121
else
123122
{
124-
finalOut = runDefaultVariantMerge(ctx, manager, action, handler);
123+
finalOut = runDefaultVariantMerge(ctx, action, handler);
125124
}
126125

127126
Map<String, File> scatterOutputs = getPipelineJob().getScatterJobOutputs();
@@ -136,7 +135,7 @@ private VariantProcessingJob getPipelineJob()
136135
if (finalOut != null)
137136
{
138137
SequenceOutputFile finalOutput = ((SequenceOutputHandler.TracksVCF) getPipelineJob().getHandler()).createFinalSequenceOutput(getJob(), finalOut, getPipelineJob().getFiles());
139-
manager.addSequenceOutput(finalOutput);
138+
ctx.getFileManager().addSequenceOutput(finalOutput);
140139
}
141140
}
142141
else
@@ -147,16 +146,16 @@ private VariantProcessingJob getPipelineJob()
147146
File cacheDir = getPipelineJob().getLocationForCachedInputs(_wd, false);
148147
if (cacheDir.exists())
149148
{
150-
manager.addIntermediateFile(cacheDir);
149+
ctx.getFileManager().addIntermediateFile(cacheDir);
151150
}
152151

153-
manager.deleteIntermediateFiles();
154-
manager.cleanup(Collections.singleton(action));
152+
ctx.getFileManager().deleteIntermediateFiles();
153+
ctx.getFileManager().cleanup(Collections.singleton(action));
155154

156155
return new RecordedActionSet(action);
157156
}
158157

159-
private @Nullable File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
158+
private @Nullable File runDefaultVariantMerge(JobContextImpl ctx, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
160159
{
161160
Map<String, List<Interval>> jobToIntervalMap = getPipelineJob().getJobToIntervalMap();
162161
getJob().setStatus(PipelineJob.TaskStatus.running, "Combining Per-Contig VCFs: " + jobToIntervalMap.size());
@@ -186,9 +185,9 @@ else if (!vcf.exists())
186185

187186
toConcat.add(vcf);
188187

189-
manager.addInput(action, "Input VCF", vcf);
190-
manager.addIntermediateFile(vcf);
191-
manager.addIntermediateFile(new File(vcf.getPath() + ".tbi"));
188+
ctx.getFileManager().addInput(action, "Input VCF", vcf);
189+
ctx.getFileManager().addIntermediateFile(vcf);
190+
ctx.getFileManager().addIntermediateFile(new File(vcf.getPath() + ".tbi"));
192191
}
193192

194193
if (totalNull > 0 && !toConcat.isEmpty())
@@ -225,13 +224,13 @@ else if (!vcf.exists())
225224
boolean sortAfterMerge = getPipelineJob().scatterMethodRequiresSort() || handler instanceof VariantProcessingStep.SupportsScatterGather && ((VariantProcessingStep.SupportsScatterGather) handler).doSortAfterMerge();
226225
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, getJob().getLogger(), true, null, sortAfterMerge);
227226
}
228-
manager.addOutput(action, "Merged VCF", combined);
227+
ctx.getFileManager().addOutput(action, "Merged VCF", combined);
229228
}
230229

231230
if (handler instanceof VariantProcessingStep.SupportsScatterGather)
232231
{
233232
ctx.getLogger().debug("Running additional merge tasks");
234-
((VariantProcessingStep.SupportsScatterGather) handler).performAdditionalMergeTasks(ctx, getPipelineJob(), manager, genome, toConcat, new ArrayList<>(jobToIntervalMap.keySet()));
233+
((VariantProcessingStep.SupportsScatterGather) handler).performAdditionalMergeTasks(ctx, getPipelineJob(), genome, toConcat, new ArrayList<>(jobToIntervalMap.keySet()));
235234
}
236235

237236
return combined;

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/SplitVcfBySamplesStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private List<File> findProducedVcfs(File inputVCF, File outputDirectory)
9090
}
9191

9292
@Override
93-
public void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
93+
public void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, ReferenceGenome genome, List<File> orderedScatterOutputs, List<String> orderedJobDirs) throws PipelineJobException
9494
{
9595
job.getLogger().info("Merging additional track VCFs");
9696
File inputVCF = ((SequenceJob)getPipelineCtx().getJob()).getInputFiles().get(0);
@@ -133,7 +133,7 @@ public void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, Pi
133133
so.setFile(combined);
134134
so.setCategory("VCF File");
135135
so.setLibrary_id(genome.getGenomeId());
136-
manager.addSequenceOutput(so);
136+
ctx.getFileManager().addSequenceOutput(so);
137137
}
138138
}
139139

0 commit comments

Comments
 (0)