1717
1818import au .com .bytecode .opencsv .CSVReader ;
1919import org .jetbrains .annotations .NotNull ;
20+ import org .labkey .api .collections .CaseInsensitiveHashMap ;
21+ import org .labkey .api .data .CompareType ;
2022import org .labkey .api .data .DbSchema ;
2123import org .labkey .api .data .DbScope ;
24+ import org .labkey .api .data .SimpleFilter ;
2225import org .labkey .api .data .Table ;
2326import org .labkey .api .data .TableInfo ;
27+ import org .labkey .api .data .TableSelector ;
2428import org .labkey .api .exp .api .ExpData ;
2529import org .labkey .api .exp .api .ExpProtocol ;
2630import org .labkey .api .exp .api .ExpRun ;
3135import org .labkey .api .pipeline .PipelineJobException ;
3236import org .labkey .api .pipeline .RecordedAction ;
3337import org .labkey .api .pipeline .RecordedActionSet ;
38+ import org .labkey .api .query .FieldKey ;
3439import org .labkey .api .reader .Readers ;
40+ import org .labkey .api .sequenceanalysis .SequenceAnalysisService ;
3541import org .labkey .api .sequenceanalysis .model .Readset ;
3642import org .labkey .api .util .FileType ;
43+ import org .labkey .api .util .PageFlowUtil ;
3744import org .labkey .sequenceanalysis .ReadDataImpl ;
3845import org .labkey .sequenceanalysis .SequenceAnalysisManager ;
3946import org .labkey .sequenceanalysis .SequenceAnalysisSchema ;
4451
4552import java .io .File ;
4653import java .io .IOException ;
54+ import java .sql .SQLException ;
4755import java .util .ArrayList ;
4856import java .util .Arrays ;
57+ import java .util .Collections ;
4958import java .util .Date ;
5059import java .util .HashMap ;
5160import java .util .List ;
5261import java .util .Map ;
62+ import java .util .Set ;
5363
5464/**
5565 * User: bbimber
@@ -153,12 +163,21 @@ private void importReadsets() throws PipelineJobException
153163 for (Readset rs : getPipelineJob ().getSequenceSupport ().getCachedReadsets ())
154164 {
155165 SequenceReadsetImpl r = (SequenceReadsetImpl )rs ;
156- boolean updateExisting = r .getReadsetId () != null && r .getReadsetId () > 0 ;
157-
158166 getJob ().getLogger ().info ("Starting readset " + r .getName ());
159167
168+ boolean readsetExists = r .getReadsetId () != null && r .getReadsetId () > 0 ;
169+ List <ReadDataImpl > preexistingReadData ;
170+ if (readsetExists )
171+ {
172+ preexistingReadData = ((SequenceReadsetImpl )SequenceAnalysisService .get ().getReadset (r .getReadsetId (), getJob ().getUser ())).getReadDataImpl ();
173+ }
174+ else
175+ {
176+ preexistingReadData = Collections .emptyList ();
177+ }
178+
160179 SequenceReadsetImpl row ;
161- if (!updateExisting )
180+ if (!readsetExists )
162181 {
163182 row = new SequenceReadsetImpl ();
164183
@@ -195,18 +214,25 @@ private void importReadsets() throws PipelineJobException
195214 throw new PipelineJobException ("Readset lacks a rowid: " + r .getReadsetId ());
196215 }
197216
198- if (row .getReadData () != null && ! row . getReadData (). isEmpty () )
217+ if (row .getInstrumentRunId () == null )
199218 {
200- throw new PipelineJobException ( "Readset already has data imported: " + row .getReadsetId ());
219+ row .setInstrumentRunId ( r . getInstrumentRunId ());
201220 }
202221
203- if (row . getInstrumentRunId () == null )
222+ if (! preexistingReadData . isEmpty () )
204223 {
205- row .setInstrumentRunId (r .getInstrumentRunId ());
224+ getJob ().getLogger ().debug ("Existing readset found with " + preexistingReadData .size () + " read pairs, will clone and merge data" );
225+
226+ row .unsetRowId ();
227+ row .setCreatedBy (getJob ().getUser ().getUserId ());
228+ row .setCreated (new Date ());
229+ row .setModifiedBy (getJob ().getUser ().getUserId ());
230+ row .setModified (new Date ());
231+ readsetExists = false ;
206232 }
207233 }
208234
209- //now add readData
235+ //now add readData created in this run:
210236 List <ReadDataImpl > readDatas = new ArrayList <>();
211237 for (ReadDataImpl rd : r .getReadDataImpl ())
212238 {
@@ -269,10 +295,47 @@ private void importReadsets() throws PipelineJobException
269295 }
270296
271297 rd .setRunId (runId );
272-
273298 readDatas .add (rd );
274299 }
275300
301+ List <Map <String , Object >> qualMetricsToAdd = new ArrayList <>();
302+ if (!preexistingReadData .isEmpty ())
303+ {
304+ preexistingReadData .forEach (rd -> {
305+ rd .setRowid (null );
306+ rd .setReadset (null );
307+
308+ SimpleFilter filter = new SimpleFilter (FieldKey .fromString ("category" ), "Readset" , CompareType .EQUAL );
309+ filter .addCondition (FieldKey .fromString ("readset" ), r .getRowId ());
310+ if (rd .getFile2 () == null )
311+ {
312+ filter .addCondition (FieldKey .fromString ("dataid" ), rd .getFileId1 (), CompareType .EQUAL );
313+ }
314+ else
315+ {
316+ filter .addCondition (FieldKey .fromString ("dataid" ), Arrays .asList (rd .getFileId1 (), rd .getFileId2 ()), CompareType .IN );
317+ }
318+
319+ final Set <String > fields = PageFlowUtil .set ("dataid" , "category" , "metricname" , "metricvalue" , "qualvalue" , "comment" , "container" , "created" , "createdby" , "modified" , "modifiedby" );
320+ new TableSelector (SequenceAnalysisSchema .getTable (SequenceAnalysisSchema .TABLE_QUALITY_METRICS ), fields , filter , null ).forEachResults (results -> {
321+ Map <String , Object > map = new CaseInsensitiveHashMap <>();
322+
323+ fields .stream ().forEach (f -> {
324+ try
325+ {
326+ map .put (f , results .getObject (FieldKey .fromString (f )));
327+ }
328+ catch (SQLException e )
329+ {
330+ throw new RuntimeException (e );
331+ }
332+ });
333+
334+ qualMetricsToAdd .add (map );
335+ });
336+ });
337+ }
338+
276339 row .setRunId (runId );
277340 row .setModified (new Date ());
278341 row .setModifiedBy (getJob ().getUser ().getUserId ());
@@ -286,7 +349,7 @@ private void importReadsets() throws PipelineJobException
286349 row .setReadData (readDatas );
287350
288351 SequenceReadsetImpl newRow ;
289- if (!updateExisting )
352+ if (!readsetExists )
290353 {
291354 newRow = Table .insert (getJob ().getUser (), readsetTable , row );
292355 getJob ().getLogger ().info ("Created readset: " + newRow .getReadsetId ());
@@ -327,6 +390,17 @@ private void importReadsets() throws PipelineJobException
327390 rd .setModified (new Date ());
328391
329392 Table .insert (getJob ().getUser (), readDataTable , rd );
393+
394+ TableInfo metricsTable = SequenceAnalysisManager .get ().getTable (SequenceAnalysisSchema .TABLE_QUALITY_METRICS );
395+ if (!qualMetricsToAdd .isEmpty ())
396+ {
397+ getJob ().getLogger ().info ("Copying " + qualMetricsToAdd .size () + " quality metrics from pre-existing readdata" );
398+ for (Map <String , Object > qm : qualMetricsToAdd )
399+ {
400+ qm .put ("readset" , newRow .getReadsetId ());
401+ Table .insert (getJob ().getUser (), metricsTable , qm );
402+ }
403+ }
330404 }
331405 }
332406
@@ -417,6 +491,7 @@ public static long addQualityMetricsForReadset(Readset rs, int fileId, PipelineJ
417491 metricsMap = FastqUtils .getQualityMetrics (d .getFile (), job .getLogger ());
418492 }
419493
494+ TableInfo metricsTable = SequenceAnalysisManager .get ().getTable (SequenceAnalysisSchema .TABLE_QUALITY_METRICS );
420495 for (String metricName : metricsMap .keySet ())
421496 {
422497 Map <String , Object > r = new HashMap <>();
@@ -428,7 +503,7 @@ public static long addQualityMetricsForReadset(Readset rs, int fileId, PipelineJ
428503 r .put ("container" , rs .getContainer () == null ? job .getContainer () : rs .getContainer ());
429504 r .put ("createdby" , job .getUser ().getUserId ());
430505
431- Table .insert (job .getUser (), SequenceAnalysisManager . get (). getTable ( SequenceAnalysisSchema . TABLE_QUALITY_METRICS ) , r );
506+ Table .insert (job .getUser (), metricsTable , r );
432507 }
433508
434509 if (cachedMetrics .exists ())
0 commit comments