11package org .labkey .sequenceanalysis .analysis ;
22
33import org .json .JSONObject ;
4+ import org .labkey .api .data .SimpleFilter ;
45import org .labkey .api .data .Table ;
56import org .labkey .api .data .TableInfo ;
7+ import org .labkey .api .data .TableSelector ;
68import org .labkey .api .module .ModuleLoader ;
79import org .labkey .api .pipeline .PipelineJob ;
810import org .labkey .api .pipeline .PipelineJobException ;
11+ import org .labkey .api .pipeline .PipelineJobService ;
12+ import org .labkey .api .pipeline .PipelineService ;
13+ import org .labkey .api .pipeline .PipelineStatusFile ;
914import org .labkey .api .pipeline .RecordedAction ;
15+ import org .labkey .api .query .FieldKey ;
16+ import org .labkey .api .query .QueryService ;
1017import org .labkey .api .sequenceanalysis .SequenceAnalysisService ;
1118import org .labkey .api .sequenceanalysis .SequenceOutputFile ;
1219import org .labkey .api .sequenceanalysis .model .AnalysisModel ;
1623import org .labkey .api .sequenceanalysis .pipeline .ToolParameterDescriptor ;
1724import org .labkey .api .util .FileType ;
1825import org .labkey .api .util .FileUtil ;
26+ import org .labkey .api .util .PageFlowUtil ;
27+ import org .labkey .api .util .UnexpectedException ;
1928import org .labkey .sequenceanalysis .SequenceAnalysisManager ;
2029import org .labkey .sequenceanalysis .SequenceAnalysisModule ;
2130import org .labkey .sequenceanalysis .SequenceAnalysisSchema ;
2231import org .labkey .sequenceanalysis .model .AnalysisModelImpl ;
2332import org .labkey .sequenceanalysis .pipeline .PicardMetricsUtil ;
33+ import org .labkey .sequenceanalysis .pipeline .SequenceOutputHandlerJob ;
2434import org .labkey .sequenceanalysis .run .util .AlignmentSummaryMetricsWrapper ;
2535import org .labkey .sequenceanalysis .run .util .CollectInsertSizeMetricsWrapper ;
2636import org .labkey .sequenceanalysis .run .util .CollectWgsMetricsWithNonZeroCoverageWrapper ;
@@ -81,7 +91,7 @@ public boolean doRunRemote()
8191 @ Override
8292 public boolean doRunLocal ()
8393 {
84- return true ;
94+ return false ;
8595 }
8696
8797 @ Override
@@ -113,12 +123,40 @@ public void init(JobContext ctx, List<SequenceOutputFile> inputFiles, List<Recor
113123 @ Override
114124 public void processFilesOnWebserver (PipelineJob job , SequenceAnalysisJobSupport support , List <SequenceOutputFile > inputFiles , JSONObject params , File outputDir , List <RecordedAction > actions , List <SequenceOutputFile > outputsToCreate ) throws UnsupportedOperationException , PipelineJobException
115125 {
116- for (SequenceOutputFile o : inputFiles )
126+
127+ }
128+
129+ @ Override
130+ public void complete (PipelineJob job , List <SequenceOutputFile > inputs , List <SequenceOutputFile > outputsCreated , SequenceAnalysisJobSupport support ) throws PipelineJobException
131+ {
132+ if (!(job instanceof SequenceOutputHandlerJob shj ))
117133 {
118- if (o .getAnalysis_id () == null )
134+ throw new IllegalStateException ("Expected job to be a SequenceOutputHandlerJob" );
135+ }
136+
137+ boolean collectSummary = shj .getParameterJson ().optBoolean ("collectSummary" , false );
138+ boolean collectInsertSize = shj .getParameterJson ().optBoolean ("collectInsertSize" , false );
139+ boolean collectWgs = shj .getParameterJson ().optBoolean ("collectWgs" , false );
140+ boolean collectWgsNonZero = shj .getParameterJson ().optBoolean ("collectWgsNonZero" , false );
141+ boolean runMarkDuplicates = shj .getParameterJson ().optBoolean ("markDuplicates" , false );
142+
143+ for (SequenceOutputFile o : inputs )
144+ {
145+ Integer analysisId = o .getAnalysis_id ();
146+ if (analysisId == null )
119147 {
120- job .getLogger ().warn ("no analysis Id for file: " + o .getName ());
121- continue ;
148+ job .getLogger ().warn ("no analysis Id for file, attempting to find this job: " + o .getName ());
149+ PipelineStatusFile sf = PipelineService .get ().getStatusFile (job .getJobGUID ());
150+
151+ TableSelector ts = new TableSelector (QueryService .get ().getUserSchema (job .getUser (), job .getContainer (), SequenceAnalysisSchema .SCHEMA_NAME ).getTable (SequenceAnalysisSchema .TABLE_ANALYSES ), PageFlowUtil .set ("rowid" ), new SimpleFilter (FieldKey .fromString ("runid/JobId" ), sf .getRowId ()), null );
152+ if (ts .exists ())
153+ {
154+ analysisId = ts .getObject (Integer .class );
155+ }
156+ else
157+ {
158+ throw new IllegalStateException ("Unable to find analysis for the input for this job" );
159+ }
122160 }
123161
124162 if (o .getLibrary_id () == null )
@@ -127,49 +165,62 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
127165 continue ;
128166 }
129167
130- AnalysisModel m = AnalysisModelImpl .getFromDb (o . getAnalysis_id () , job .getUser ());
168+ AnalysisModel m = AnalysisModelImpl .getFromDb (analysisId , job .getUser ());
131169 if (m != null )
132170 {
133171 job .getLogger ().warn ("processing analysis: " + m .getRowId ());
172+ File outputDir = ((SequenceOutputHandlerJob )job ).getAnalysisDirectory ();
134173 List <File > metricsFiles = new ArrayList <>();
135174
136- RecordedAction action = new RecordedAction (getName ());
137- action .addInput (o .getFile (), "Input BAM" );
138-
139175 File mf = new File (outputDir , FileUtil .getBaseName (o .getFile ()) + ".summary.metrics" );
140176 if (mf .exists ())
141177 {
142- action .addOutput (mf , "Alignment Summary Metrics" , false );
143178 metricsFiles .add (mf );
144179 }
180+ else if (collectSummary )
181+ {
182+ throw new PipelineJobException ("Missing file: " + mf .getPath ());
183+ }
145184
146185 File mf2 = new File (outputDir , FileUtil .getBaseName (o .getFile ()) + ".insertsize.metrics" );
147186 if (mf2 .exists ())
148187 {
149- action .addOutput (mf2 , "InsertSize Metrics" , false );
150188 metricsFiles .add (mf2 );
151189 }
190+ else if (collectInsertSize )
191+ {
192+ throw new PipelineJobException ("Missing file: " + mf2 .getPath ());
193+ }
152194
153195 File mf3 = new File (outputDir , FileUtil .getBaseName (o .getFile ()) + ".wgs.metrics" );
154196 if (mf3 .exists ())
155197 {
156- action .addOutput (mf3 , "WGS Metrics" , false );
157198 metricsFiles .add (mf3 );
158199 }
200+ else if (collectWgs )
201+ {
202+ throw new PipelineJobException ("Missing file: " + mf3 .getPath ());
203+ }
159204
160205 File mf4 = new File (outputDir , FileUtil .getBaseName (o .getFile ()) + ".wgsNonZero.metrics" );
161206 if (mf4 .exists ())
162207 {
163- action .addOutput (mf4 , "WGS Metrics Over Non-Zero Coverage" , false );
164208 metricsFiles .add (mf4 );
165209 }
210+ else if (collectWgsNonZero )
211+ {
212+ throw new PipelineJobException ("Missing file: " + mf4 .getPath ());
213+ }
166214
167- File mf5 = new MarkDuplicatesWrapper (job .getLogger ()).getMetricsFile (m . getAlignmentFileObject ());
215+ File mf5 = new MarkDuplicatesWrapper (job .getLogger ()).getMetricsFile (o . getFile ());
168216 if (mf5 .exists ())
169217 {
170- action .addOutput (mf5 , "Duplication Metrics" , false );
171218 metricsFiles .add (mf5 );
172219 }
220+ else if (runMarkDuplicates )
221+ {
222+ throw new PipelineJobException ("Missing file: " + mf5 .getPath ());
223+ }
173224
174225 TableInfo ti = SequenceAnalysisManager .get ().getTable (SequenceAnalysisSchema .TABLE_QUALITY_METRICS );
175226 for (File f : metricsFiles )
@@ -187,8 +238,6 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
187238 Table .insert (job .getUser (), ti , row );
188239 }
189240 }
190-
191- actions .add (action );
192241 }
193242 else
194243 {
0 commit comments