Skip to content

Commit 72d94ba

Browse files
authored
Merge pull request #45 from LabKey/fb_merge_discvr-20.3
Merge discvr 20.3
2 parents d965245 + e60894d commit 72d94ba

19 files changed

+803
-54
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/AlignerIndexUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,10 @@ private static boolean verifyOrCreateCachedIndex(PipelineContext ctx, @Nullable
9898

9999
destination = wd.inputFile(webserverIndexDir, destination, true);
100100
if (output != null && !destination.equals(webserverIndexDir))
101+
{
102+
ctx.getLogger().debug("adding deferred delete file: " + destination.getPath());
101103
output.addDeferredDeleteIntermediateFile(destination);
102-
104+
}
103105
ctx.getLogger().info("finished copying files");
104106
}
105107
else

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,11 @@ public static interface SupportsScatterGather
5252
{
5353

5454
}
55+
56+
public static interface MayRequirePrepareTask
57+
{
58+
public boolean isRequired(PipelineJob job);
59+
60+
public void doWork(List<SequenceOutputFile> inputFiles, SequenceOutputHandler.JobContext ctx) throws PipelineJobException;
61+
}
5562
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.labkey.sequenceanalysis.run.analysis.HaplotypeCallerAnalysis;
7979
import org.labkey.sequenceanalysis.run.analysis.ImmunoGenotypingAnalysis;
8080
import org.labkey.sequenceanalysis.run.analysis.LofreqAnalysis;
81+
import org.labkey.sequenceanalysis.run.analysis.MergeLoFreqVcfHandler;
8182
import org.labkey.sequenceanalysis.run.analysis.PARalyzerAnalysis;
8283
import org.labkey.sequenceanalysis.run.analysis.SequenceBasedTypingAnalysis;
8384
import org.labkey.sequenceanalysis.run.analysis.SnpCountAnalysis;
@@ -328,6 +329,7 @@ public static void registerPipelineSteps()
328329
SequenceAnalysisService.get().registerFileHandler(new MultiQCBamHandler());
329330
SequenceAnalysisService.get().registerFileHandler(new GenomicsDBImportHandler());
330331
SequenceAnalysisService.get().registerFileHandler(new GenomicsDBAppendHandler());
332+
SequenceAnalysisService.get().registerFileHandler(new MergeLoFreqVcfHandler());
331333

332334
SequenceAnalysisService.get().registerReadsetHandler(new MultiQCHandler());
333335
SequenceAnalysisService.get().registerReadsetHandler(new CellHashingHandler());

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/GenotypeGVCFHandler.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
2929
import org.labkey.sequenceanalysis.pipeline.JobContextImpl;
3030
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
31+
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
3132
import org.labkey.sequenceanalysis.run.util.CombineGVCFsWrapper;
3233
import org.labkey.sequenceanalysis.run.util.GenomicsDBImportHandler;
3334
import org.labkey.sequenceanalysis.run.util.GenotypeGVCFsWrapper;
@@ -47,7 +48,7 @@
4748
/**
4849
* Created by bimber on 8/26/2014.
4950
*/
50-
public class GenotypeGVCFHandler implements SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>, SequenceOutputHandler.HasActionNames, SequenceOutputHandler.TracksVCF
51+
public class GenotypeGVCFHandler implements SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>, SequenceOutputHandler.HasActionNames, SequenceOutputHandler.TracksVCF, VariantProcessingStep.MayRequirePrepareTask
5152
{
5253
private FileType _gvcfFileType = new FileType(Arrays.asList(".g.vcf"), ".g.vcf", false, FileType.gzSupportLevel.SUPPORT_GZ);
5354

@@ -293,14 +294,14 @@ private File runGenotypeGVCFs(PipelineJob job, JobContext ctx, ProcessVariantsHa
293294
action.addInput(f, "Input Variants");
294295
}
295296

296-
boolean doCopyLocal = ctx.getParams().optBoolean("variantCalling.GenotypeGVCFs.doCopyInputs", false);
297+
boolean doCopyLocal = doCopyLocal(ctx.getParams());
297298

298299
Set<File> toDelete = new HashSet<>();
299300
List<File> filesToProcess = new ArrayList<>();
300301
if (doCopyLocal)
301302
{
302303
ctx.getLogger().info("making local copies of gVCF/GenomicsDB files prior to genotyping");
303-
filesToProcess.addAll(GenotypeGVCFsWrapper.copyVcfsLocally(inputFiles, toDelete, null, ctx.getLogger(), outputVcf.exists()));
304+
filesToProcess.addAll(GenotypeGVCFsWrapper.copyVcfsLocally(inputFiles, toDelete, GenotypeGVCFHandler.getLocalCopyDir(ctx, true), ctx.getLogger(), outputVcf.exists()));
304305
}
305306
else
306307
{
@@ -312,6 +313,8 @@ private File runGenotypeGVCFs(PipelineJob job, JobContext ctx, ProcessVariantsHa
312313
if (filesToProcess.size() > 1)
313314
{
314315
inputVcf = combineInputs(ctx, filesToProcess, genomeId);
316+
ctx.getFileManager().addIntermediateFile(inputVcf);
317+
ctx.getFileManager().addIntermediateFile(new File(inputVcf.getPath() + ".tbi"));
315318
}
316319
else
317320
{
@@ -387,8 +390,13 @@ private File runGenotypeGVCFs(PipelineJob job, JobContext ctx, ProcessVariantsHa
387390

388391
private File combineInputs(JobContext ctx, List<File> inputFiles, int genomeId) throws PipelineJobException
389392
{
390-
// TODO: this should ultimately be expanded to include smarter merge with GenomicsDB
391-
// Also consider allowing the input to be a folder with per-contig gVCFs
393+
for (File f : inputFiles)
394+
{
395+
if (!GenotypeGVCFsWrapper.GVCF.isType(f))
396+
{
397+
throw new PipelineJobException("If multiple inputs are used, all must be gVCFs: " + f.getName());
398+
}
399+
}
392400

393401
String basename = getBasename(ctx);
394402
File combined = new File(ctx.getOutputDir(), basename + ".combined.gvcf.gz");
@@ -422,4 +430,48 @@ private File combineInputs(JobContext ctx, List<File> inputFiles, int genomeId)
422430
return combined;
423431
}
424432
}
433+
434+
private boolean doCopyLocal(JSONObject params)
435+
{
436+
return params.optBoolean("variantCalling.GenotypeGVCFs.doCopyInputs", false);
437+
}
438+
439+
@Override
440+
public boolean isRequired(PipelineJob job)
441+
{
442+
if (job instanceof VariantProcessingJob)
443+
{
444+
VariantProcessingJob vpj = (VariantProcessingJob)job;
445+
446+
return doCopyLocal(vpj.getParameterJson());
447+
}
448+
449+
return false;
450+
}
451+
452+
@Override
453+
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
454+
{
455+
doCopyGvcfLocally(inputFiles, ctx);
456+
}
457+
458+
public static void doCopyGvcfLocally(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
459+
{
460+
VariantProcessingJob vpj = (VariantProcessingJob)ctx.getJob();
461+
List<File> inputVCFs = new ArrayList<>();
462+
inputFiles.forEach(f -> inputVCFs.add(f.getFile()));
463+
464+
ctx.getLogger().info("making local copies of gVCFs");
465+
GenotypeGVCFsWrapper.copyVcfsLocally(inputVCFs, new ArrayList<>(), getLocalCopyDir(ctx, true), ctx.getLogger(), false);
466+
}
467+
468+
public static File getLocalCopyDir(JobContext ctx, boolean createIfDoesntExist)
469+
{
470+
if (ctx.getJob() instanceof VariantProcessingJob)
471+
{
472+
return ((VariantProcessingJob)ctx.getJob()).getLocationForCachedInputs(ctx.getWorkDir(), createIfDoesntExist);
473+
}
474+
475+
return ctx.getOutputDir();
476+
}
425477
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/AbstractResumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void markComplete(boolean deleteFile)
116116
if (deleteFile)
117117
file.delete();
118118
else
119-
_log.debug("delete of file will be deferred");
119+
_log.debug("delete of file will be deferred: " + file.getPath());
120120
}
121121
}
122122

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,6 @@ public void addStepOutputs(RecordedAction action, PipelineStepOutput output)
144144
addDeferredIntermediateFile(file);
145145
}
146146

147-
for (File file : output.getDeferredDeleteIntermediateFiles())
148-
{
149-
addDeferredIntermediateFile(file);
150-
}
151-
152147
for (PipelineStepOutput.SequenceOutput o : output.getSequenceOutputs())
153148
{
154149
addSequenceOutput(o.getFile(), o.getLabel(), o.getCategory(), o.getReadsetId(), o.getAnalysisId(), o.getGenomeId(), o.getDescription());
@@ -199,7 +194,7 @@ public void addDeferredIntermediateFile(File file)
199194
{
200195
String path = FilenameUtils.normalize(file.getPath());
201196
String relPath = FileUtil.relativePath(_workLocation.getPath(), path);
202-
_job.getLogger().debug("Adding deferred intermediate file: " + relPath + " || " + path);
197+
_job.getLogger().debug("Adding deferred intermediate file. relative path: " + relPath + ", path: " + path);
203198
if (relPath == null)
204199
{
205200
relPath = file.getPath();

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingJob.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
import au.com.bytecode.opencsv.CSVReader;
44
import au.com.bytecode.opencsv.CSVWriter;
55
import com.fasterxml.jackson.annotation.JsonIgnore;
6-
import com.fasterxml.jackson.core.type.TypeReference;
76
import com.fasterxml.jackson.databind.ObjectMapper;
87
import htsjdk.samtools.SAMSequenceDictionary;
98
import htsjdk.samtools.SAMSequenceRecord;
109
import htsjdk.samtools.util.Interval;
1110
import htsjdk.variant.utils.SAMSequenceDictionaryExtractor;
12-
import org.apache.log4j.Logger;
11+
import org.apache.commons.lang3.StringUtils;
1312
import org.jetbrains.annotations.Nullable;
1413
import org.json.JSONObject;
1514
import org.junit.Assert;
@@ -21,6 +20,7 @@
2120
import org.labkey.api.pipeline.PipelineJobService;
2221
import org.labkey.api.pipeline.TaskId;
2322
import org.labkey.api.pipeline.TaskPipeline;
23+
import org.labkey.api.pipeline.WorkDirectory;
2424
import org.labkey.api.reader.Readers;
2525
import org.labkey.api.security.User;
2626
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
@@ -306,8 +306,6 @@ public void setScatterGatherMethod(ScatterGatherUtils.ScatterGatherMethod scatte
306306

307307
public static class TestCase extends Assert
308308
{
309-
private static final Logger _log = Logger.getLogger(SequenceAlignmentTask.TestCase.class);
310-
311309
@Test
312310
public void serializeTest() throws Exception
313311
{
@@ -354,4 +352,34 @@ public File getDataDirectory()
354352
assertEquals(intervalMap, intervalMap2);
355353
}
356354
}
355+
356+
public File getLocationForCachedInputs(WorkDirectory wd, boolean createIfDoesntExist)
357+
{
358+
File ret;
359+
360+
String localDir = PipelineJobService.get().getConfigProperties().getSoftwarePackagePath("LOCAL_DATA_CACHE_DIR");
361+
if (localDir == null)
362+
{
363+
localDir = StringUtils.trimToNull(System.getenv("LOCAL_DATA_CACHE_DIR"));
364+
}
365+
366+
if (localDir == null)
367+
{
368+
ret = new File(wd.getDir(), "cachedData");
369+
}
370+
else
371+
{
372+
String guid = getParentGUID() == null ? getJobGUID() : getParentGUID();
373+
ret = new File(localDir, guid);
374+
375+
getLogger().debug("Using local directory to cache data: " + ret.getPath());
376+
}
377+
378+
if (createIfDoesntExist && !ret.exists())
379+
{
380+
ret.mkdirs();
381+
}
382+
383+
return ret;
384+
}
357385
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ private VariantProcessingJob getPipelineJob()
134134
throw new PipelineJobException("Handler does not support TracksVCF: " + handler.getName());
135135
}
136136

137+
File cacheDir = getPipelineJob().getLocationForCachedInputs(_wd, false);
138+
if (cacheDir.exists())
139+
{
140+
manager.addIntermediateFile(cacheDir);
141+
}
142+
137143
manager.deleteIntermediateFiles();
138144
manager.cleanup(Collections.singleton(action));
139145

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package org.labkey.sequenceanalysis.pipeline;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import org.labkey.api.pipeline.AbstractTaskFactory;
5+
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
6+
import org.labkey.api.pipeline.PipelineJob;
7+
import org.labkey.api.pipeline.PipelineJobException;
8+
import org.labkey.api.pipeline.RecordedActionSet;
9+
import org.labkey.api.pipeline.WorkDirectoryTask;
10+
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
11+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
12+
import org.labkey.api.util.FileType;
13+
14+
import java.io.IOException;
15+
import java.util.ArrayList;
16+
import java.util.Collections;
17+
import java.util.List;
18+
19+
public class VariantProcessingScatterRemotePrepareTask extends WorkDirectoryTask<VariantProcessingScatterRemotePrepareTask.Factory>
20+
{
21+
private static final String ACTION_NAME = "Prepare Scatter/Gather";
22+
23+
protected VariantProcessingScatterRemotePrepareTask(Factory factory, PipelineJob job)
24+
{
25+
super(factory, job);
26+
}
27+
28+
public static class Factory extends AbstractTaskFactory<AbstractTaskFactorySettings, Factory>
29+
{
30+
public Factory()
31+
{
32+
super(VariantProcessingScatterRemotePrepareTask.class);
33+
}
34+
35+
public List<FileType> getInputTypes()
36+
{
37+
return Collections.emptyList();
38+
}
39+
40+
public String getStatusName()
41+
{
42+
return PipelineJob.TaskStatus.running.toString();
43+
}
44+
45+
public List<String> getProtocolActionNames()
46+
{
47+
List<String> allowableNames = new ArrayList<>();
48+
allowableNames.add(ACTION_NAME);
49+
50+
return allowableNames;
51+
}
52+
53+
@Override
54+
public boolean isJoin()
55+
{
56+
return true;
57+
}
58+
59+
@Override
60+
public boolean isParticipant(PipelineJob job) throws IOException
61+
{
62+
if (job instanceof VariantProcessingJob)
63+
{
64+
VariantProcessingJob vpj = (VariantProcessingJob)job;
65+
if (!vpj.isScatterJob())
66+
{
67+
job.getLogger().info("Skipping: " + ACTION_NAME);
68+
return false;
69+
}
70+
else
71+
{
72+
if (vpj.getHandler() instanceof VariantProcessingStep.MayRequirePrepareTask)
73+
{
74+
return ((VariantProcessingStep.MayRequirePrepareTask)vpj.getHandler()).isRequired(vpj);
75+
}
76+
}
77+
}
78+
79+
return false;
80+
}
81+
82+
public PipelineJob.Task createTask(PipelineJob job)
83+
{
84+
return new VariantProcessingScatterRemotePrepareTask(this, job);
85+
}
86+
87+
public boolean isJobComplete(PipelineJob job)
88+
{
89+
return false;
90+
}
91+
}
92+
93+
private VariantProcessingJob getPipelineJob()
94+
{
95+
return (VariantProcessingJob)getJob();
96+
}
97+
98+
@Override
99+
public @NotNull RecordedActionSet run() throws PipelineJobException
100+
{
101+
SequenceTaskHelper.logModuleVersions(getJob().getLogger());
102+
103+
VariantProcessingJob variantJob = getPipelineJob();
104+
SequenceOutputHandler handler = variantJob.getHandler();
105+
106+
if (!( handler instanceof VariantProcessingStep.MayRequirePrepareTask))
107+
{
108+
throw new PipelineJobException("This handler does not implement MayRequirePrepareTask: " + handler.getName());
109+
}
110+
111+
JobContextImpl ctx = new JobContextImpl(getPipelineJob(), getPipelineJob().getSequenceSupport(), getPipelineJob().getParameterJson(), _wd.getDir(), new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd), _wd);
112+
113+
((VariantProcessingStep.MayRequirePrepareTask)handler).doWork(getPipelineJob().getFiles(), ctx);
114+
115+
//Note: on job resume the TaskFileManager could be replaced with one from the resumer
116+
//Also, this needs to run after the step above to manage SequenceOutputFiles
117+
ctx.getFileManager().deleteIntermediateFiles();
118+
ctx.getFileManager().cleanup(ctx.getActions());
119+
120+
return new RecordedActionSet(ctx.getActions());
121+
}
122+
123+
124+
}

0 commit comments

Comments
 (0)