Skip to content

Commit de92181

Browse files
committed
UpdateSeuratPrototype now updates DB record
1 parent c5a4578 commit de92181

File tree

10 files changed

+93
-45
lines changed

10 files changed

+93
-45
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/SequenceOutputHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ default void init(JobContext ctx, List<SequenceOutputFile> inputFiles, List<Reco
198198

199199
void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext ctx) throws UnsupportedOperationException, PipelineJobException;
200200

201-
default void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
201+
default void complete(JobContext ctx, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
202202
{
203203

204204
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/PicardAlignmentMetricsHandler.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
126126
}
127127

128128
@Override
129-
public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
129+
public void complete(JobContext ctx, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
130130
{
131-
if (!(job instanceof SequenceOutputHandlerJob shj))
131+
if (!(ctx.getJob() instanceof SequenceOutputHandlerJob shj))
132132
{
133133
throw new IllegalStateException("Expected job to be a SequenceOutputHandlerJob");
134134
}
@@ -144,10 +144,10 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<Sequ
144144
Integer analysisId = o.getAnalysis_id();
145145
if (analysisId == null)
146146
{
147-
job.getLogger().warn("no analysis Id for file, attempting to find this job: " + o.getName());
148-
PipelineStatusFile sf = PipelineService.get().getStatusFile(job.getJobGUID());
147+
ctx.getJob().getLogger().warn("no analysis Id for file, attempting to find this job: " + o.getName());
148+
PipelineStatusFile sf = PipelineService.get().getStatusFile(ctx.getJob().getJobGUID());
149149

150-
TableSelector ts = new TableSelector(QueryService.get().getUserSchema(job.getUser(), job.getContainer(), SequenceAnalysisSchema.SCHEMA_NAME).getTable(SequenceAnalysisSchema.TABLE_ANALYSES), PageFlowUtil.set("rowid"), new SimpleFilter(FieldKey.fromString("runid/JobId"), sf.getRowId()), null);
150+
TableSelector ts = new TableSelector(QueryService.get().getUserSchema(ctx.getJob().getUser(), ctx.getJob().getContainer(), SequenceAnalysisSchema.SCHEMA_NAME).getTable(SequenceAnalysisSchema.TABLE_ANALYSES), PageFlowUtil.set("rowid"), new SimpleFilter(FieldKey.fromString("runid/JobId"), sf.getRowId()), null);
151151
if (ts.exists())
152152
{
153153
analysisId = ts.getObject(Integer.class);
@@ -160,15 +160,15 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<Sequ
160160

161161
if (o.getLibrary_id() == null)
162162
{
163-
job.getLogger().warn("no genome associated with file: " + o.getName());
163+
ctx.getJob().getLogger().warn("no genome associated with file: " + o.getName());
164164
continue;
165165
}
166166

167-
AnalysisModel m = AnalysisModelImpl.getFromDb(analysisId, job.getUser());
167+
AnalysisModel m = AnalysisModelImpl.getFromDb(analysisId, ctx.getJob().getUser());
168168
if (m != null)
169169
{
170-
job.getLogger().warn("processing analysis: " + m.getRowId());
171-
File outputDir = ((SequenceOutputHandlerJob)job).getWebserverDir(false);
170+
ctx.getJob().getLogger().warn("processing analysis: " + m.getRowId());
171+
File outputDir = ((SequenceOutputHandlerJob)ctx.getJob()).getWebserverDir(false);
172172
List<File> metricsFiles = new ArrayList<>();
173173

174174
File mf = new File(outputDir, FileUtil.getBaseName(o.getFile()) + ".summary.metrics");
@@ -191,7 +191,7 @@ else if (collectInsertSize)
191191
// This output is only created for paired data:
192192
if (o.getReadset() != null)
193193
{
194-
Readset rs = SequenceAnalysisService.get().getReadset(o.getReadset(), job.getUser());
194+
Readset rs = SequenceAnalysisService.get().getReadset(o.getReadset(), ctx.getJob().getUser());
195195
if (rs.getReadData().stream().filter(rd -> rd.getFileId2() != null).count() > 0)
196196
{
197197
throw new PipelineJobException("Missing file: " + mf2.getPath());
@@ -219,7 +219,7 @@ else if (collectWgsNonZero)
219219
throw new PipelineJobException("Missing file: " + mf4.getPath());
220220
}
221221

222-
File mf5 = new MarkDuplicatesWrapper(job.getLogger()).getMetricsFile(o.getFile());
222+
File mf5 = new MarkDuplicatesWrapper(ctx.getJob().getLogger()).getMetricsFile(o.getFile());
223223
if (mf5.exists())
224224
{
225225
metricsFiles.add(mf5);
@@ -232,23 +232,23 @@ else if (runMarkDuplicates)
232232
TableInfo ti = SequenceAnalysisManager.get().getTable(SequenceAnalysisSchema.TABLE_QUALITY_METRICS);
233233
for (File f : metricsFiles)
234234
{
235-
List<Map<String, Object>> lines = PicardMetricsUtil.processFile(f, job.getLogger());
235+
List<Map<String, Object>> lines = PicardMetricsUtil.processFile(f, ctx.getJob().getLogger());
236236
for (Map<String, Object> row : lines)
237237
{
238238
row.put("container", o.getContainer());
239-
row.put("createdby", job.getUser().getUserId());
239+
row.put("createdby", ctx.getJob().getUser().getUserId());
240240
row.put("created", new Date());
241241
row.put("readset", m.getReadset());
242242
row.put("analysis_id", m.getRowId());
243243
row.put("dataid", m.getAlignmentFile());
244244

245-
Table.insert(job.getUser(), ti, row);
245+
Table.insert(ctx.getJob().getUser(), ti, row);
246246
}
247247
}
248248
}
249249
else
250250
{
251-
job.getLogger().warn("Analysis Id " + o.getAnalysis_id() + " not found for file: " + o.getName());
251+
ctx.getJob().getLogger().warn("Analysis Id " + o.getAnalysis_id() + " not found for file: " + o.getName());
252252
}
253253
}
254254
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ConvertToCramHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private void checkCramAndIndex(SequenceOutputFile so) throws PipelineJobExceptio
161161
}
162162

163163
@Override
164-
public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
164+
public void complete(JobContext ctx, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
165165
{
166166
List<Map<String, Object>> toUpdate = new ArrayList<>();
167167
List<Map<String, Object>> oldKeys = inputs.stream().map(x -> {
@@ -175,11 +175,11 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<Sequ
175175
File cram = new File(so.getFile().getParentFile(), FileUtil.getBaseName(so.getFile()) + ".cram");
176176
checkCramAndIndex(so);
177177

178-
job.getLogger().info("Updating ExpData record with new filepath: " + cram.getPath());
178+
ctx.getJob().getLogger().info("Updating ExpData record with new filepath: " + cram.getPath());
179179
ExpData d = so.getExpData();
180180
d.setDataFileURI(cram.toURI());
181181
d.setName(cram.getName());
182-
d.save(job.getUser());
182+
d.save(ctx.getJob().getUser());
183183

184184
if (so.getName().contains(".bam"))
185185
{
@@ -194,8 +194,8 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<Sequ
194194

195195
try
196196
{
197-
Container target = job.getContainer().isWorkbook() ? job.getContainer().getParent() : job.getContainer();
198-
QueryService.get().getUserSchema(job.getUser(), target, SequenceAnalysisSchema.SCHEMA_NAME).getTable(SequenceAnalysisSchema.TABLE_OUTPUTFILES).getUpdateService().updateRows(job.getUser(), target, toUpdate, oldKeys, null, null);
197+
Container target = ctx.getJob().getContainer().isWorkbook() ? ctx.getJob().getContainer().getParent() : ctx.getJob().getContainer();
198+
QueryService.get().getUserSchema(ctx.getJob().getUser(), target, SequenceAnalysisSchema.SCHEMA_NAME).getTable(SequenceAnalysisSchema.TABLE_OUTPUTFILES).getUpdateService().updateRows(ctx.getJob().getUser(), target, toUpdate, oldKeys, null, null);
199199
}
200200
catch (QueryUpdateServiceException | InvalidKeyException | BatchValidationException | SQLException e)
201201
{

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -722,14 +722,14 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
722722
}
723723

724724
@Override
725-
public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
725+
public void complete(JobContext ctx, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
726726
{
727-
SequenceTaskHelper taskHelper = new SequenceTaskHelper(getPipelineJob(job), getPipelineJob(job).getDataDirectory());
728-
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(job, VariantProcessingStep.class);
727+
SequenceTaskHelper taskHelper = new SequenceTaskHelper(getPipelineJob(ctx.getJob()), getPipelineJob(ctx.getJob()).getDataDirectory());
728+
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(ctx.getJob(), VariantProcessingStep.class);
729729
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
730730
{
731731
VariantProcessingStep step = stepCtx.getProvider().create(taskHelper);
732-
step.complete(job, inputs, outputsCreated, support);
732+
step.complete(ctx.getJob(), inputs, outputsCreated, ctx.getSequenceSupport());
733733
}
734734
}
735735
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ReblockGvcfHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private File getReblockedName(File gvcf)
129129
}
130130

131131
@Override
132-
public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
132+
public void complete(JobContext ctx, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
133133
{
134134
List<Map<String, Object>> toUpdate = new ArrayList<>();
135135
List<Map<String, Object>> oldKeys = inputs.stream().map(x -> {
@@ -146,11 +146,11 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<Sequ
146146
throw new PipelineJobException("Unable to find file: " + reblocked.getPath());
147147
}
148148

149-
job.getLogger().info("Updating ExpData record with new filepath: " + reblocked.getPath());
149+
ctx.getJob().getLogger().info("Updating ExpData record with new filepath: " + reblocked.getPath());
150150
ExpData d = so.getExpData();
151151
d.setDataFileURI(reblocked.toURI());
152152
d.setName(reblocked.getName());
153-
d.save(job.getUser());
153+
d.save(ctx.getJob().getUser());
154154

155155
if (so.getName().contains(".g.vcf.gz"))
156156
{
@@ -165,8 +165,8 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<Sequ
165165

166166
try
167167
{
168-
Container target = job.getContainer().isWorkbook() ? job.getContainer().getParent() : job.getContainer();
169-
QueryService.get().getUserSchema(job.getUser(), target, SequenceAnalysisSchema.SCHEMA_NAME).getTable(SequenceAnalysisSchema.TABLE_OUTPUTFILES).getUpdateService().updateRows(job.getUser(), target, toUpdate, oldKeys, null, null);
168+
Container target = ctx.getJob().getContainer().isWorkbook() ? ctx.getJob().getContainer().getParent() : ctx.getJob().getContainer();
169+
QueryService.get().getUserSchema(ctx.getJob().getUser(), target, SequenceAnalysisSchema.SCHEMA_NAME).getTable(SequenceAnalysisSchema.TABLE_OUTPUTFILES).getUpdateService().updateRows(ctx.getJob().getUser(), target, toUpdate, oldKeys, null, null);
170170
}
171171
catch (QueryUpdateServiceException | InvalidKeyException | BatchValidationException | SQLException e)
172172
{

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceOutputHandlerFinalTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ public RecordedActionSet run() throws PipelineJobException
150150
}
151151

152152
//run final handler
153-
getPipelineJob().getHandler().getProcessor().complete(getPipelineJob(), getPipelineJob().getFiles(), outputsCreated, getPipelineJob().getSequenceSupport());
153+
TaskFileManagerImpl manager = new TaskFileManagerImpl(getPipelineJob(), getPipelineJob().getAnalysisDirectory(), null);
154+
JobContextImpl ctx = new JobContextImpl(getPipelineJob(), getPipelineJob().getSequenceSupport(), getPipelineJob().getParameterJson(), getPipelineJob().getAnalysisDirectory(), manager, null);
155+
getPipelineJob().getHandler().getProcessor().complete(ctx, getPipelineJob().getFiles(), outputsCreated);
154156

155157
File xml = getPipelineJob().getSerializedOutputFilesFile();
156158
if (xml.exists())

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/analysis/NextCladeHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
9595
}
9696

9797
@Override
98-
public void complete(PipelineJob job, List<SequenceOutputFile> inputFiles, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
98+
public void complete(JobContext ctx, List<SequenceOutputFile> inputFiles, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
9999
{
100100
Map<Integer, SequenceOutputFile> readsetToInput = inputFiles.stream().collect(Collectors.toMap(SequenceOutputFile::getReadset, x -> x));
101101

102-
job.getLogger().info("Parsing NextClade JSON:");
102+
ctx.getJob().getLogger().info("Parsing NextClade JSON:");
103103
for (SequenceOutputFile so : outputsCreated)
104104
{
105105
if (!NEXTCLADE_JSON.equals(so.getCategory()))
@@ -118,7 +118,7 @@ public void complete(PipelineJob job, List<SequenceOutputFile> inputFiles, List<
118118
throw new PipelineJobException("Unable to find parent for output: " + so.getRowid());
119119
}
120120

121-
processAndImportNextCladeAa(job, so.getFile(), parent.getAnalysis_id(), so.getLibrary_id(), so.getDataId(), so.getReadset(), parent.getFile(), true);
121+
processAndImportNextCladeAa(ctx.getJob(), so.getFile(), parent.getAnalysis_id(), so.getLibrary_id(), so.getDataId(), so.getReadset(), parent.getFile(), true);
122122
}
123123
}
124124

singlecell/api-src/org/labkey/api/singlecell/pipeline/AbstractSingleCellStep.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ default void init(SequenceOutputHandler.JobContext ctx, List<SequenceOutputFile>
2424

2525
}
2626

27+
default void complete(SequenceOutputHandler.JobContext ctx, List<SequenceOutputFile> inputFiles, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
28+
{
29+
30+
}
31+
2732
default boolean createsSeuratObjects()
2833
{
2934
return true;

singlecell/src/org/labkey/singlecell/analysis/AbstractSingleCellHandler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,30 +271,37 @@ public void processFilesOnWebserver(PipelineJob job, SequenceAnalysisJobSupport
271271
}
272272

273273
@Override
274-
public void complete(PipelineJob job, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated, SequenceAnalysisJobSupport support) throws PipelineJobException
274+
public void complete(JobContext ctx, List<SequenceOutputFile> inputs, List<SequenceOutputFile> outputsCreated) throws PipelineJobException
275275
{
276276
for (SequenceOutputFile so : outputsCreated)
277277
{
278278
if ("Seurat Cell Hashing Calls".equals(so.getCategory()))
279279
{
280-
job.getLogger().info("Adding metrics for output: " + so.getName());
281-
CellHashingService.get().processMetrics(so, job, true);
280+
ctx.getJob().getLogger().info("Adding metrics for output: " + so.getName());
281+
CellHashingService.get().processMetrics(so, ctx.getJob(), true);
282282
}
283283

284284
if (SEURAT_PROTOTYPE.equals(so.getCategory()))
285285
{
286286
//NOTE: upstream we enforce one dataset per job, so we can safely assume this is the only dataset here:
287-
File metricFile = new File(job.getLogFile().getParentFile(), "seurat.metrics.txt");
287+
File metricFile = new File(ctx.getJob().getLogFile().getParentFile(), "seurat.metrics.txt");
288288
if (metricFile.exists())
289289
{
290-
processMetrics(so, job, metricFile);
290+
processMetrics(so, ctx.getJob(), metricFile);
291291
}
292292
else
293293
{
294-
job.getLogger().info("Metrics file not found, skipping");
294+
ctx.getJob().getLogger().info("Metrics file not found, skipping");
295295
}
296296
}
297297
}
298+
299+
List<PipelineStepCtx<SingleCellStep>> steps = SequencePipelineService.get().getSteps(ctx.getJob(), SingleCellStep.class);
300+
for (PipelineStepCtx<SingleCellStep> stepCtx : steps)
301+
{
302+
SingleCellStep step = stepCtx.getProvider().create(ctx);
303+
step.complete(ctx, inputs, outputsCreated);
304+
}
298305
}
299306

300307
private void processMetrics(SequenceOutputFile so, PipelineJob job, File metricsFile) throws PipelineJobException

0 commit comments

Comments
 (0)