33import htsjdk .samtools .util .Interval ;
44import org .apache .commons .lang3 .StringUtils ;
55import org .jetbrains .annotations .NotNull ;
6+ import org .jetbrains .annotations .Nullable ;
67import org .labkey .api .pipeline .AbstractTaskFactory ;
78import org .labkey .api .pipeline .AbstractTaskFactorySettings ;
89import org .labkey .api .pipeline .PipelineJob ;
@@ -127,13 +128,16 @@ private VariantProcessingJob getPipelineJob()
127128 if (handler instanceof SequenceOutputHandler .TracksVCF )
128129 {
129130 Set <SequenceOutputFile > outputs = new HashSet <>();
130- scatterOutputs .values ().forEach (f -> outputs .addAll (getPipelineJob ().getOutputsToCreate ().stream ().filter (x -> f .equals (x .getFile ())).collect (Collectors .toSet ())));
131+ scatterOutputs .values ().forEach (f -> outputs .addAll (getPipelineJob ().getOutputsToCreate ().stream ().filter (x -> x != null && f .equals (x .getFile ())).collect (Collectors .toSet ())));
131132 getJob ().getLogger ().debug ("Total component outputs created: " + outputs .size ());
132133 getPipelineJob ().getOutputsToCreate ().removeAll (outputs );
133134 getJob ().getLogger ().debug ("Total SequenceOutputFiles on job after remove: " + getPipelineJob ().getOutputsToCreate ().size ());
134135
135- SequenceOutputFile finalOutput = ((SequenceOutputHandler .TracksVCF )getPipelineJob ().getHandler ()).createFinalSequenceOutput (getJob (), finalOut , getPipelineJob ().getFiles ());
136- manager .addSequenceOutput (finalOutput );
136+ if (finalOut != null )
137+ {
138+ SequenceOutputFile finalOutput = ((SequenceOutputHandler .TracksVCF ) getPipelineJob ().getHandler ()).createFinalSequenceOutput (getJob (), finalOut , getPipelineJob ().getFiles ());
139+ manager .addSequenceOutput (finalOutput );
140+ }
137141 }
138142 else
139143 {
@@ -152,14 +156,15 @@ private VariantProcessingJob getPipelineJob()
152156 return new RecordedActionSet (action );
153157 }
154158
155- private File runDefaultVariantMerge (JobContextImpl ctx , TaskFileManagerImpl manager , RecordedAction action , SequenceOutputHandler <SequenceOutputHandler .SequenceOutputProcessor > handler ) throws PipelineJobException
159+ private @ Nullable File runDefaultVariantMerge (JobContextImpl ctx , TaskFileManagerImpl manager , RecordedAction action , SequenceOutputHandler <SequenceOutputHandler .SequenceOutputProcessor > handler ) throws PipelineJobException
156160 {
157161 Map <String , List <Interval >> jobToIntervalMap = getPipelineJob ().getJobToIntervalMap ();
158162 getJob ().setStatus (PipelineJob .TaskStatus .running , "Combining Per-Contig VCFs: " + jobToIntervalMap .size ());
159163
160164 Map <String , File > scatterOutputs = getPipelineJob ().getScatterJobOutputs ();
161165 List <File > toConcat = new ArrayList <>();
162166 Set <File > missing = new HashSet <>();
167+ int totalNull = 0 ;
163168 for (String name : jobToIntervalMap .keySet ())
164169 {
165170 if (!scatterOutputs .containsKey (name ))
@@ -168,45 +173,29 @@ private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl mana
168173 }
169174
170175 File vcf = scatterOutputs .get (name );
171- if (! vcf . exists () )
176+ if (scatterOutputs . get ( name ) == null )
172177 {
173- missing .add (vcf );
174- }
175-
176- // NOTE: this was added to fix a one-time issue where -L was dropped from some upstream GenotypeGVCFs.
177- // Under normal conditions this would never be necessary.
178- boolean ensureOutputsWithinIntervals = getPipelineJob ().getParameterJson ().optBoolean ("variantCalling.GenotypeGVCFs.ensureOutputsWithinIntervalsOnMerge" , false );
179- if (ensureOutputsWithinIntervals )
180- {
181- getJob ().getLogger ().debug ("Ensuring ensure scatter outputs respect intervals" );
182-
183- File subsetVcf = new File (vcf .getParentFile (), SequenceAnalysisService .get ().getUnzippedBaseName (vcf .getName ()) + ".subset.vcf.gz" );
184- File subsetVcfIdx = new File (subsetVcf .getPath () + ".tbi" );
185- manager .addIntermediateFile (subsetVcf );
186- manager .addIntermediateFile (subsetVcfIdx );
187-
188- if (subsetVcfIdx .exists ())
189- {
190- getJob ().getLogger ().debug ("Index exists, will not re-subset the VCF: " + subsetVcf .getName ());
191- }
192- else
193- {
194- OutputVariantsStartingInIntervalsStep .Wrapper wrapper = new OutputVariantsStartingInIntervalsStep .Wrapper (getJob ().getLogger ());
195- wrapper .execute (vcf , subsetVcf , getPipelineJob ().getIntervalsForTask ());
196- }
197-
198- toConcat .add (subsetVcf );
178+ totalNull ++;
179+ continue ;
199180 }
200- else
181+ else if (! vcf . exists ())
201182 {
202- toConcat .add (vcf );
183+ missing .add (vcf );
184+ continue ;
203185 }
204186
187+ toConcat .add (vcf );
188+
205189 manager .addInput (action , "Input VCF" , vcf );
206190 manager .addIntermediateFile (vcf );
207191 manager .addIntermediateFile (new File (vcf .getPath () + ".tbi" ));
208192 }
209193
194+ if (totalNull > 0 && !toConcat .isEmpty ())
195+ {
196+ throw new PipelineJobException ("The scatter jobs returned a mixture of null and non-null outputs" );
197+ }
198+
210199 Set <Integer > genomeIds = new HashSet <>();
211200 getPipelineJob ().getFiles ().forEach (x -> genomeIds .add (x .getLibrary_id ()));
212201 if (genomeIds .size () != 1 )
@@ -216,29 +205,33 @@ private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl mana
216205
217206 ReferenceGenome genome = getPipelineJob ().getSequenceSupport ().getCachedGenome (genomeIds .iterator ().next ());
218207
219- String basename = SequenceAnalysisService .get ().getUnzippedBaseName (toConcat .get (0 ).getName ());
220- File combined = new File (getPipelineJob ().getAnalysisDirectory (), basename + ".vcf.gz" );
221- File combinedIdx = new File (combined .getPath () + ".tbi" );
222- if (combinedIdx .exists ())
208+ File combined = null ;
209+ if (!toConcat .isEmpty ())
223210 {
224- getJob ().getLogger ().info ("VCF exists, will not recreate: " + combined .getPath ());
225- }
226- else
227- {
228- if (!missing .isEmpty ())
211+ String basename = SequenceAnalysisService .get ().getUnzippedBaseName (toConcat .get (0 ).getName ());
212+ combined = new File (getPipelineJob ().getAnalysisDirectory (), basename + ".vcf.gz" );
213+ File combinedIdx = new File (combined .getPath () + ".tbi" );
214+ if (combinedIdx .exists ())
229215 {
230- throw new PipelineJobException ( "Missing one of more VCFs : " + missing . stream (). map ( File :: getPath ). collect ( Collectors . joining ( "," ) ));
216+ getJob (). getLogger (). info ( "VCF exists, will not recreate : " + combined . getPath ( ));
231217 }
218+ else
219+ {
220+ if (!missing .isEmpty ())
221+ {
222+ throw new PipelineJobException ("Missing one of more VCFs: " + missing .stream ().map (File ::getPath ).collect (Collectors .joining ("," )));
223+ }
232224
233- boolean sortAfterMerge = handler instanceof VariantProcessingStep .SupportsScatterGather && ((VariantProcessingStep .SupportsScatterGather )handler ).doSortAfterMerge ();
234- combined = SequenceAnalysisService .get ().combineVcfs (toConcat , combined , genome , getJob ().getLogger (), true , null , sortAfterMerge );
225+ boolean sortAfterMerge = handler instanceof VariantProcessingStep .SupportsScatterGather && ((VariantProcessingStep .SupportsScatterGather ) handler ).doSortAfterMerge ();
226+ combined = SequenceAnalysisService .get ().combineVcfs (toConcat , combined , genome , getJob ().getLogger (), true , null , sortAfterMerge );
227+ }
228+ manager .addOutput (action , "Merged VCF" , combined );
235229 }
236- manager .addOutput (action , "Merged VCF" , combined );
237230
238231 if (handler instanceof VariantProcessingStep .SupportsScatterGather )
239232 {
240233 ctx .getLogger ().debug ("Running additional merge tasks" );
241- ((VariantProcessingStep .SupportsScatterGather ) handler ).performAdditionalMergeTasks (ctx , getPipelineJob (), manager , genome , toConcat );
234+ ((VariantProcessingStep .SupportsScatterGather ) handler ).performAdditionalMergeTasks (ctx , getPipelineJob (), manager , genome , toConcat , new ArrayList <>( jobToIntervalMap . keySet ()) );
242235 }
243236
244237 return combined ;
0 commit comments