Skip to content

Commit 48c6e0e

Browse files
committed
Create optional task prior to scatter/gather jobs that runs on cluster, allowing jobs to perform actions prior to split
1 parent 54ebc1e commit 48c6e0e

File tree

11 files changed

+297
-15
lines changed

11 files changed

+297
-15
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,11 @@ public static interface SupportsScatterGather
5252
{
5353

5454
}
55+
56+
public static interface MayRequirePrepareTask
57+
{
58+
public boolean isRequired(PipelineJob job);
59+
60+
public void doWork(List<SequenceOutputFile> inputFiles, SequenceOutputHandler.JobContext ctx) throws PipelineJobException;
61+
}
5562
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/GenotypeGVCFHandler.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
2929
import org.labkey.sequenceanalysis.pipeline.JobContextImpl;
3030
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
31+
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
3132
import org.labkey.sequenceanalysis.run.util.CombineGVCFsWrapper;
3233
import org.labkey.sequenceanalysis.run.util.GenomicsDBImportHandler;
3334
import org.labkey.sequenceanalysis.run.util.GenotypeGVCFsWrapper;
@@ -47,7 +48,7 @@
4748
/**
4849
* Created by bimber on 8/26/2014.
4950
*/
50-
public class GenotypeGVCFHandler implements SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>, SequenceOutputHandler.HasActionNames, SequenceOutputHandler.TracksVCF
51+
public class GenotypeGVCFHandler implements SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>, SequenceOutputHandler.HasActionNames, SequenceOutputHandler.TracksVCF, VariantProcessingStep.MayRequirePrepareTask
5152
{
5253
private FileType _gvcfFileType = new FileType(Arrays.asList(".g.vcf"), ".g.vcf", false, FileType.gzSupportLevel.SUPPORT_GZ);
5354

@@ -293,14 +294,14 @@ private File runGenotypeGVCFs(PipelineJob job, JobContext ctx, ProcessVariantsHa
293294
action.addInput(f, "Input Variants");
294295
}
295296

296-
boolean doCopyLocal = ctx.getParams().optBoolean("variantCalling.GenotypeGVCFs.doCopyInputs", false);
297+
boolean doCopyLocal = doCopyLocal(ctx.getParams());
297298

298299
Set<File> toDelete = new HashSet<>();
299300
List<File> filesToProcess = new ArrayList<>();
300301
if (doCopyLocal)
301302
{
302303
ctx.getLogger().info("making local copies of gVCF/GenomicsDB files prior to genotyping");
303-
filesToProcess.addAll(GenotypeGVCFsWrapper.copyVcfsLocally(inputFiles, toDelete, null, ctx.getLogger(), outputVcf.exists()));
304+
filesToProcess.addAll(GenotypeGVCFsWrapper.copyVcfsLocally(inputFiles, toDelete, GenotypeGVCFHandler.getLocalCopyDir(ctx, true), ctx.getLogger(), outputVcf.exists()));
304305
}
305306
else
306307
{
@@ -429,4 +430,48 @@ private File combineInputs(JobContext ctx, List<File> inputFiles, int genomeId)
429430
return combined;
430431
}
431432
}
433+
434+
private boolean doCopyLocal(JSONObject params)
435+
{
436+
return params.optBoolean("variantCalling.GenotypeGVCFs.doCopyInputs", false);
437+
}
438+
439+
@Override
440+
public boolean isRequired(PipelineJob job)
441+
{
442+
if (job instanceof VariantProcessingJob)
443+
{
444+
VariantProcessingJob vpj = (VariantProcessingJob)job;
445+
446+
return doCopyLocal(vpj.getParameterJson());
447+
}
448+
449+
return false;
450+
}
451+
452+
@Override
453+
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
454+
{
455+
doCopyGvcfLocally(inputFiles, ctx);
456+
}
457+
458+
public static void doCopyGvcfLocally(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
459+
{
460+
VariantProcessingJob vpj = (VariantProcessingJob)ctx.getJob();
461+
List<File> inputVCFs = new ArrayList<>();
462+
inputFiles.forEach(f -> inputVCFs.add(f.getFile()));
463+
464+
ctx.getLogger().info("making local copies of gVCFs");
465+
GenotypeGVCFsWrapper.copyVcfsLocally(inputVCFs, new ArrayList<>(), getLocalCopyDir(ctx, true), ctx.getLogger(), false);
466+
}
467+
468+
public static File getLocalCopyDir(JobContext ctx, boolean createIfDoesntExist)
469+
{
470+
if (ctx.getJob() instanceof VariantProcessingJob)
471+
{
472+
return ((VariantProcessingJob)ctx.getJob()).getLocationForCachedInputs(ctx.getWorkDir(), createIfDoesntExist);
473+
}
474+
475+
return ctx.getOutputDir();
476+
}
432477
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/AbstractResumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void markComplete(boolean deleteFile)
116116
if (deleteFile)
117117
file.delete();
118118
else
119-
_log.debug("delete of file will be deferred");
119+
_log.debug("delete of file will be deferred: " + file.getPath());
120120
}
121121
}
122122

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingJob.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
import au.com.bytecode.opencsv.CSVReader;
44
import au.com.bytecode.opencsv.CSVWriter;
55
import com.fasterxml.jackson.annotation.JsonIgnore;
6-
import com.fasterxml.jackson.core.type.TypeReference;
76
import com.fasterxml.jackson.databind.ObjectMapper;
87
import htsjdk.samtools.SAMSequenceDictionary;
98
import htsjdk.samtools.SAMSequenceRecord;
109
import htsjdk.samtools.util.Interval;
1110
import htsjdk.variant.utils.SAMSequenceDictionaryExtractor;
12-
import org.apache.log4j.Logger;
11+
import org.apache.commons.lang3.StringUtils;
1312
import org.jetbrains.annotations.Nullable;
1413
import org.json.JSONObject;
1514
import org.junit.Assert;
@@ -21,6 +20,7 @@
2120
import org.labkey.api.pipeline.PipelineJobService;
2221
import org.labkey.api.pipeline.TaskId;
2322
import org.labkey.api.pipeline.TaskPipeline;
23+
import org.labkey.api.pipeline.WorkDirectory;
2424
import org.labkey.api.reader.Readers;
2525
import org.labkey.api.security.User;
2626
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
@@ -306,8 +306,6 @@ public void setScatterGatherMethod(ScatterGatherUtils.ScatterGatherMethod scatte
306306

307307
public static class TestCase extends Assert
308308
{
309-
private static final Logger _log = Logger.getLogger(SequenceAlignmentTask.TestCase.class);
310-
311309
@Test
312310
public void serializeTest() throws Exception
313311
{
@@ -354,4 +352,34 @@ public File getDataDirectory()
354352
assertEquals(intervalMap, intervalMap2);
355353
}
356354
}
355+
356+
public File getLocationForCachedInputs(WorkDirectory wd, boolean createIfDoesntExist)
357+
{
358+
File ret;
359+
360+
String localDir = PipelineJobService.get().getConfigProperties().getSoftwarePackagePath("LOCAL_DATA_CACHE_DIR");
361+
if (localDir == null)
362+
{
363+
localDir = StringUtils.trimToNull(System.getenv("LOCAL_DATA_CACHE_DIR"));
364+
}
365+
366+
if (localDir == null)
367+
{
368+
ret = new File(wd.getDir(), "cachedData");
369+
}
370+
else
371+
{
372+
String guid = getParentGUID() == null ? getJobGUID() : getParentGUID();
373+
ret = new File(localDir, guid);
374+
375+
getLogger().debug("Using local directory to cache data: " + ret.getPath());
376+
}
377+
378+
if (createIfDoesntExist && !ret.exists())
379+
{
380+
ret.mkdirs();
381+
}
382+
383+
return ret;
384+
}
357385
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ private VariantProcessingJob getPipelineJob()
134134
throw new PipelineJobException("Handler does not support TracksVCF: " + handler.getName());
135135
}
136136

137+
File cacheDir = getPipelineJob().getLocationForCachedInputs(_wd, false);
138+
if (cacheDir.exists())
139+
{
140+
manager.addIntermediateFile(cacheDir);
141+
}
142+
137143
manager.deleteIntermediateFiles();
138144
manager.cleanup(Collections.singleton(action));
139145

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package org.labkey.sequenceanalysis.pipeline;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import org.labkey.api.pipeline.AbstractTaskFactory;
5+
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
6+
import org.labkey.api.pipeline.PipelineJob;
7+
import org.labkey.api.pipeline.PipelineJobException;
8+
import org.labkey.api.pipeline.RecordedActionSet;
9+
import org.labkey.api.pipeline.WorkDirectoryTask;
10+
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
11+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
12+
import org.labkey.api.util.FileType;
13+
14+
import java.io.IOException;
15+
import java.util.ArrayList;
16+
import java.util.Collections;
17+
import java.util.List;
18+
19+
public class VariantProcessingScatterRemotePrepareTask extends WorkDirectoryTask<VariantProcessingScatterRemotePrepareTask.Factory>
20+
{
21+
private static final String ACTION_NAME = "Prepare Scatter/Gather";
22+
23+
protected VariantProcessingScatterRemotePrepareTask(Factory factory, PipelineJob job)
24+
{
25+
super(factory, job);
26+
}
27+
28+
public static class Factory extends AbstractTaskFactory<AbstractTaskFactorySettings, Factory>
29+
{
30+
public Factory()
31+
{
32+
super(VariantProcessingScatterRemotePrepareTask.class);
33+
}
34+
35+
public List<FileType> getInputTypes()
36+
{
37+
return Collections.emptyList();
38+
}
39+
40+
public String getStatusName()
41+
{
42+
return PipelineJob.TaskStatus.running.toString();
43+
}
44+
45+
public List<String> getProtocolActionNames()
46+
{
47+
List<String> allowableNames = new ArrayList<>();
48+
allowableNames.add(ACTION_NAME);
49+
50+
return allowableNames;
51+
}
52+
53+
@Override
54+
public boolean isJoin()
55+
{
56+
return true;
57+
}
58+
59+
@Override
60+
public boolean isParticipant(PipelineJob job) throws IOException
61+
{
62+
if (job instanceof VariantProcessingJob)
63+
{
64+
VariantProcessingJob vpj = (VariantProcessingJob)job;
65+
if (!vpj.isScatterJob())
66+
{
67+
job.getLogger().info("Skipping: " + ACTION_NAME);
68+
return false;
69+
}
70+
else
71+
{
72+
if (vpj.getHandler() instanceof VariantProcessingStep.MayRequirePrepareTask)
73+
{
74+
return ((VariantProcessingStep.MayRequirePrepareTask)vpj.getHandler()).isRequired(vpj);
75+
}
76+
}
77+
}
78+
79+
return false;
80+
}
81+
82+
public PipelineJob.Task createTask(PipelineJob job)
83+
{
84+
return new VariantProcessingScatterRemotePrepareTask(this, job);
85+
}
86+
87+
public boolean isJobComplete(PipelineJob job)
88+
{
89+
return false;
90+
}
91+
}
92+
93+
private VariantProcessingJob getPipelineJob()
94+
{
95+
return (VariantProcessingJob)getJob();
96+
}
97+
98+
@Override
99+
public @NotNull RecordedActionSet run() throws PipelineJobException
100+
{
101+
SequenceTaskHelper.logModuleVersions(getJob().getLogger());
102+
103+
VariantProcessingJob variantJob = getPipelineJob();
104+
SequenceOutputHandler handler = variantJob.getHandler();
105+
106+
if (!( handler instanceof VariantProcessingStep.MayRequirePrepareTask))
107+
{
108+
throw new PipelineJobException("This handler does not implement MayRequirePrepareTask: " + handler.getName());
109+
}
110+
111+
JobContextImpl ctx = new JobContextImpl(getPipelineJob(), getPipelineJob().getSequenceSupport(), getPipelineJob().getParameterJson(), _wd.getDir(), new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd), _wd);
112+
113+
((VariantProcessingStep.MayRequirePrepareTask)handler).doWork(getPipelineJob().getFiles(), ctx);
114+
115+
//Note: on job resume the TaskFileManager could be replaced with one from the resumer
116+
//Also, this needs to run after the step above to manage SequenceOutputFiles
117+
ctx.getFileManager().deleteIntermediateFiles();
118+
ctx.getFileManager().cleanup(ctx.getActions());
119+
120+
return new RecordedActionSet(ctx.getActions());
121+
}
122+
123+
124+
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/util/AbstractGenomicsDBImportHandler.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
2424
import org.labkey.api.sequenceanalysis.pipeline.TaskFileManager;
2525
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
26+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
2627
import org.labkey.api.util.FileType;
2728
import org.labkey.api.util.FileUtil;
29+
import org.labkey.sequenceanalysis.analysis.GenotypeGVCFHandler;
2830
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
2931
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
3032
import org.labkey.sequenceanalysis.util.SequenceUtil;
@@ -42,7 +44,7 @@
4244
import java.util.Map;
4345
import java.util.Set;
4446

45-
abstract public class AbstractGenomicsDBImportHandler extends AbstractParameterizedOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> implements SequenceOutputHandler.TracksVCF, SequenceOutputHandler.HasCustomVariantMerge
47+
abstract public class AbstractGenomicsDBImportHandler extends AbstractParameterizedOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> implements SequenceOutputHandler.TracksVCF, SequenceOutputHandler.HasCustomVariantMerge, VariantProcessingStep.MayRequirePrepareTask
4648
{
4749
protected FileType _gvcfFileType = new FileType(Arrays.asList(".g.vcf"), ".g.vcf", false, FileType.gzSupportLevel.SUPPORT_GZ);
4850
public static final FileType TILE_DB_FILETYPE = new FileType(Arrays.asList(".tdb"), ".tdb", false, FileType.gzSupportLevel.NO_GZ);
@@ -426,7 +428,7 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
426428
@Override
427429
public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext ctx) throws UnsupportedOperationException, PipelineJobException
428430
{
429-
boolean doCopyGVcfLocal = ctx.getParams().optBoolean("doCopyGVcfLocal", false);
431+
boolean doCopyGVcfLocal = doCopyLocal(ctx.getParams());
430432

431433
RecordedAction action = new RecordedAction(getName());
432434
action.setStartTime(new Date());
@@ -551,7 +553,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
551553
if (doCopyGVcfLocal)
552554
{
553555
ctx.getLogger().info("making local copies of gVCFs");
554-
vcfsToProcess.addAll(GenotypeGVCFsWrapper.copyVcfsLocally(inputVcfs, toDelete, null, ctx.getLogger(), genomicsDbCompleted));
556+
vcfsToProcess.addAll(GenotypeGVCFsWrapper.copyVcfsLocally(inputVcfs, toDelete, GenotypeGVCFHandler.getLocalCopyDir(ctx, true), ctx.getLogger(), genomicsDbCompleted));
555557
}
556558
else
557559
{
@@ -635,4 +637,28 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
635637
}
636638
}
637639
}
640+
641+
private boolean doCopyLocal(JSONObject params)
642+
{
643+
return params.optBoolean("doCopyGVcfLocal", false);
644+
}
645+
646+
@Override
647+
public boolean isRequired(PipelineJob job)
648+
{
649+
if (job instanceof VariantProcessingJob)
650+
{
651+
VariantProcessingJob vpj = (VariantProcessingJob)job;
652+
653+
return doCopyLocal(vpj.getParameterJson());
654+
}
655+
656+
return false;
657+
}
658+
659+
@Override
660+
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
661+
{
662+
GenotypeGVCFHandler.doCopyGvcfLocally(inputFiles, ctx);
663+
}
638664
}

0 commit comments

Comments
 (0)