Skip to content

Commit 565e95a

Browse files
committed
Protect against duplicate quality metric import, such as failed/restarted job
1 parent 12a349e commit 565e95a

File tree

10 files changed

+165
-5
lines changed

10 files changed

+165
-5
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
UPDATE sequenceanalysis.quality_metrics SET metricname = 'Mean Read Length' WHERE metricname = 'Avg Read Length';
2+
3+
UPDATE sequenceanalysis.quality_metrics SET category = 'Readset' WHERE category IS NULL and metricname IN (
4+
'Total Reads',
5+
'Min Read Length',
6+
'Max Read Length',
7+
'Mean Read Length',
8+
'Total Bases',
9+
'Total MBases',
10+
'Total GBases',
11+
'Total Q10 Bases',
12+
'Total Q20 Bases',
13+
'Total Q30 Bases',
14+
'Total Q40 Bases',
15+
'Pct Q10',
16+
'Pct Q20',
17+
'Pct Q30',
18+
'Pct Q40'
19+
);
20+
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
UPDATE sequenceanalysis.quality_metrics SET metricname = 'Mean Read Length' WHERE metricname = 'Avg Read Length';
2+
3+
UPDATE sequenceanalysis.quality_metrics SET category = 'Readset' WHERE category IS NULL and metricname IN (
4+
'Total Reads',
5+
'Min Read Length',
6+
'Max Read Length',
7+
'Mean Read Length',
8+
'Total Bases',
9+
'Total MBases',
10+
'Total GBases',
11+
'Total Q10 Bases',
12+
'Total Q20 Bases',
13+
'Total Q30 Bases',
14+
'Total Q40 Bases',
15+
'Pct Q10',
16+
'Pct Q20',
17+
'Pct Q30',
18+
'Pct Q40'
19+
);
20+

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public String getName()
160160
@Override
161161
public Double getSchemaVersion()
162162
{
163-
return 12.322;
163+
return 12.323;
164164
}
165165

166166
@Override

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/CellHashingHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.log4j.Logger;
1212
import org.jetbrains.annotations.Nullable;
1313
import org.json.JSONObject;
14+
import org.labkey.api.data.CompareType;
1415
import org.labkey.api.data.Container;
1516
import org.labkey.api.data.SimpleFilter;
1617
import org.labkey.api.data.Sort;
@@ -182,6 +183,21 @@ public void complete(PipelineJob job, List<Readset> readsets, List<SequenceOutpu
182183
for (SequenceOutputFile so : outputsCreated)
183184
{
184185
job.getLogger().info("Saving quality metrics for: " + so.getName());
186+
187+
//NOTE: if this job errored and restarted, we may have duplicate records:
188+
SimpleFilter filter = new SimpleFilter(FieldKey.fromString("readset"), so.getReadset());
189+
filter.addCondition(FieldKey.fromString("category"), "Cell Hashing", CompareType.EQUAL);
190+
filter.addCondition(FieldKey.fromString("dataid"), so.getDataId(), CompareType.EQUAL);
191+
filter.addCondition(FieldKey.fromString("container"), job.getContainer().getId(), CompareType.EQUAL);
192+
TableSelector ts = new TableSelector(ti, PageFlowUtil.set("rowid"), filter, null);
193+
if (ts.exists())
194+
{
195+
job.getLogger().info("Deleting existing QC metrics (probably from prior restarted job)");
196+
ts.getArrayList(Integer.class).forEach(rowid -> {
197+
Table.delete(ti, rowid);
198+
});
199+
}
200+
185201
if (so.getFile().getName().endsWith(CALL_EXTENSION))
186202
{
187203
Map<String, Object> counts = parseOutputTable(job.getLogger(), so.getFile(), getCiteSeqCountUnknownOutput(so.getFile().getParentFile(), _type, null), so.getFile().getParentFile(), null, false, _type);

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/IlluminaImportTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ private void addQualityMetrics(DbSchema schema, int readsetId, Pair<Integer, Int
297297
Integer count = readCounts.get(key);
298298

299299
Map<String, Object> r = new HashMap<>();
300+
r.put("category", "Readset");
300301
r.put("metricname", "Total Reads");
301302
r.put("metricvalue", count);
302303
r.put("dataid", d.getRowId());

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ReadsetCreationTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ public static long addQualityMetricsForReadset(Readset rs, int fileId, PipelineJ
420420
for (String metricName : metricsMap.keySet())
421421
{
422422
Map<String, Object> r = new HashMap<>();
423+
r.put("category", "Readset");
423424
r.put("metricname", metricName);
424425
r.put("metricvalue", metricsMap.get(metricName));
425426
r.put("dataid", d.getRowId());

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.labkey.sequenceanalysis.pipeline;
22

33
import au.com.bytecode.opencsv.CSVReader;
4+
import au.com.bytecode.opencsv.CSVWriter;
45
import org.apache.commons.io.FileUtils;
56
import org.apache.commons.io.FilenameUtils;
67
import org.apache.commons.lang3.StringUtils;
@@ -51,6 +52,7 @@
5152
import java.util.List;
5253
import java.util.Map;
5354
import java.util.Set;
55+
import java.util.stream.Collectors;
5456

5557
/**
5658
* User: bimber
@@ -420,12 +422,56 @@ public void addIntermediateFiles(Collection<File> files)
420422
@Override
421423
public void addPicardMetricsFiles(List<PipelineStepOutput.PicardMetricsOutput> files)
422424
{
425+
//Note: in case there is a restarted job, inspect file for redundancy, and remove previous:
426+
Set<String> filePaths = files.stream().map(PipelineStepOutput.PicardMetricsOutput::getMetricFile).map(File::getPath).collect(Collectors.toSet());
427+
428+
File metricLog = getMetricsLog(false);
429+
if (metricLog.exists())
430+
{
431+
_job.getLogger().info("Inspecting existing metrics file for redundancy");
432+
433+
List<String[]> lines = new ArrayList<>();
434+
boolean needToReplace = false;
435+
try (CSVReader reader = new CSVReader(Readers.getReader(metricLog), '\t'))
436+
{
437+
String[] line;
438+
while ((line = reader.readNext()) != null)
439+
{
440+
if (filePaths.contains(line[6]))
441+
{
442+
needToReplace = true;
443+
}
444+
else
445+
{
446+
lines.add(line);
447+
}
448+
}
449+
}
450+
catch (IOException e)
451+
{
452+
throw new RuntimeException(e);
453+
}
454+
455+
if (needToReplace)
456+
{
457+
_job.getLogger().info("Replacing previously written metrics for these files, which probably indicates this job was restarted:");
458+
try (CSVWriter writer = new CSVWriter(PrintWriters.getPrintWriter(metricLog), '\t', CSVWriter.NO_QUOTE_CHARACTER))
459+
{
460+
lines.forEach(writer::writeNext);
461+
}
462+
catch (IOException e)
463+
{
464+
throw new RuntimeException(e);
465+
}
466+
}
467+
}
468+
423469
for (PipelineStepOutput.PicardMetricsOutput mf : files)
424470
{
425471
_job.getLogger().debug("adding picard metrics file: " + mf.getMetricFile().getPath());
426472

427473
//write to log
428-
File metricLog = getMetricsLog(true);
474+
metricLog = getMetricsLog(true);
429475
try (BufferedWriter writer = new BufferedWriter(new FileWriter(metricLog, true)))
430476
{
431477
String bamRelPath = mf.getInputFile() == null ? "" : FilenameUtils.normalize(_wd.getRelativePath(mf.getInputFile()));
@@ -434,7 +480,7 @@ public void addPicardMetricsFiles(List<PipelineStepOutput.PicardMetricsOutput> f
434480
List<Map<String, Object>> metricLines = PicardMetricsUtil.processFile(mf.getMetricFile(), _job.getLogger());
435481
for (Map<String, Object> line : metricLines)
436482
{
437-
writer.write(StringUtils.join(Arrays.asList(mf.getReadsetId(), bamRelPath, type, line.get("category"), line.get("metricname"), line.get("metricvalue")), "\t"));
483+
writer.write(StringUtils.join(Arrays.asList(mf.getReadsetId(), bamRelPath, type, line.get("category"), line.get("metricname"), line.get("metricvalue"), mf.getMetricFile().getPath()), "\t"));
438484
writer.write('\n');
439485
}
440486
}
@@ -471,9 +517,9 @@ public void writeMetricsToDb(Map<Integer, Integer> readsetMap, Map<Integer, Map<
471517
continue; //header
472518
}
473519

474-
if (line.length != 6)
520+
if (line.length != 7)
475521
{
476-
throw new RuntimeException("Line length is not 6: " + i + "[" + StringUtils.join(line, ";") + "]");
522+
throw new RuntimeException("Line length is not 7: " + i + "[" + StringUtils.join(line, ";") + "]");
477523
}
478524

479525
Map<String, Object> toInsert = new HashMap<>();

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/alignment/CellRangerWrapper.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@
88
import org.jetbrains.annotations.Nullable;
99
import org.json.JSONObject;
1010
import org.labkey.api.collections.CaseInsensitiveHashMap;
11+
import org.labkey.api.data.CompareType;
1112
import org.labkey.api.data.ConvertHelper;
1213
import org.labkey.api.data.DbSchema;
1314
import org.labkey.api.data.DbSchemaType;
15+
import org.labkey.api.data.SimpleFilter;
1416
import org.labkey.api.data.Table;
1517
import org.labkey.api.data.TableInfo;
18+
import org.labkey.api.data.TableSelector;
1619
import org.labkey.api.exp.api.ExpData;
1720
import org.labkey.api.exp.api.ExperimentService;
1821
import org.labkey.api.pipeline.PipelineJob;
1922
import org.labkey.api.pipeline.PipelineJobException;
23+
import org.labkey.api.query.FieldKey;
2024
import org.labkey.api.reader.Readers;
2125
import org.labkey.api.sequenceanalysis.model.AnalysisModel;
2226
import org.labkey.api.sequenceanalysis.model.ReadData;
@@ -636,6 +640,22 @@ public void complete(SequenceAnalysisJobSupport support, AnalysisModel model) th
636640
}
637641

638642
TableInfo ti = DbSchema.get("sequenceanalysis", DbSchemaType.Module).getTable("quality_metrics");
643+
644+
//NOTE: if this job errored and restarted, we may have duplicate records:
645+
SimpleFilter filter = new SimpleFilter(FieldKey.fromString("readset"), model.getReadset());
646+
filter.addCondition(FieldKey.fromString("analysis_id"), model.getRowId(), CompareType.EQUAL);
647+
filter.addCondition(FieldKey.fromString("dataid"), model.getAlignmentFile(), CompareType.EQUAL);
648+
filter.addCondition(FieldKey.fromString("category"), "Cell Ranger", CompareType.EQUAL);
649+
filter.addCondition(FieldKey.fromString("container"), getPipelineCtx().getJob().getContainer().getId(), CompareType.EQUAL);
650+
TableSelector ts = new TableSelector(ti, PageFlowUtil.set("rowid"), filter, null);
651+
if (ts.exists())
652+
{
653+
getPipelineCtx().getLogger().info("Deleting existing QC metrics (probably from prior restarted job)");
654+
ts.getArrayList(Integer.class).forEach(rowid -> {
655+
Table.delete(ti, rowid);
656+
});
657+
}
658+
639659
for (int j = 0; j < header.length; j++)
640660
{
641661
Map<String, Object> toInsert = new CaseInsensitiveHashMap<>();

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/analysis/LofreqAnalysis.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@
3030
import org.biojava3.core.sequence.io.GenericFastaHeaderParser;
3131
import org.jetbrains.annotations.Nullable;
3232
import org.json.JSONObject;
33+
import org.labkey.api.data.CompareType;
3334
import org.labkey.api.data.DbSchema;
3435
import org.labkey.api.data.DbSchemaType;
36+
import org.labkey.api.data.SimpleFilter;
3537
import org.labkey.api.data.Table;
3638
import org.labkey.api.data.TableInfo;
39+
import org.labkey.api.data.TableSelector;
3740
import org.labkey.api.pipeline.PipelineJobException;
41+
import org.labkey.api.query.FieldKey;
3842
import org.labkey.api.reader.Readers;
3943
import org.labkey.api.sequenceanalysis.SequenceAnalysisService;
4044
import org.labkey.api.sequenceanalysis.model.AnalysisModel;
@@ -51,6 +55,7 @@
5155
import org.labkey.api.sequenceanalysis.run.AbstractCommandWrapper;
5256
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
5357
import org.labkey.api.util.FileUtil;
58+
import org.labkey.api.util.PageFlowUtil;
5459
import org.labkey.api.writer.PrintWriters;
5560
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
5661
import org.labkey.sequenceanalysis.run.util.DepthOfCoverageWrapper;
@@ -659,6 +664,21 @@ public Output performAnalysisPerSampleLocal(AnalysisModel model, File inputBam,
659664
getPipelineCtx().getLogger().info("Loading metrics");
660665
int total = 0;
661666
TableInfo ti = DbSchema.get("sequenceanalysis", DbSchemaType.Module).getTable("quality_metrics");
667+
668+
//NOTE: if this job errored and restarted, we may have duplicate records:
669+
SimpleFilter filter = new SimpleFilter(FieldKey.fromString("readset"), model.getReadset());
670+
filter.addCondition(FieldKey.fromString("dataid"), model.getAlignmentFile(), CompareType.EQUAL);
671+
filter.addCondition(FieldKey.fromString("analysis_id"), model.getRowId(), CompareType.EQUAL);
672+
filter.addCondition(FieldKey.fromString("container"), getPipelineCtx().getJob().getContainer().getId(), CompareType.EQUAL);
673+
TableSelector ts = new TableSelector(ti, PageFlowUtil.set("rowid"), filter, null);
674+
if (ts.exists())
675+
{
676+
getPipelineCtx().getLogger().info("Deleting existing QC metrics (probably from prior restarted job)");
677+
ts.getArrayList(Integer.class).forEach(rowid -> {
678+
Table.delete(ti, rowid);
679+
});
680+
}
681+
662682
try (CSVReader reader = new CSVReader(Readers.getReader(metrics), '\t'))
663683
{
664684
String[] line;

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/preprocessing/TagPcrSummaryStep.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,22 @@ public Output performAnalysisPerSampleLocal(AnalysisModel model, File inputBam,
250250
getPipelineCtx().getJob().getLogger().info("Loading metrics");
251251
AtomicInteger total = new AtomicInteger(0);
252252
TableInfo ti = DbSchema.get("sequenceanalysis", DbSchemaType.Module).getTable("quality_metrics");
253+
254+
//NOTE: if this job errored and restarted, we may have duplicate records:
255+
SimpleFilter filter = new SimpleFilter(FieldKey.fromString("readset"), model.getReadset());
256+
filter.addCondition(FieldKey.fromString("dataid"), model.getAlignmentFile(), CompareType.EQUAL);
257+
filter.addCondition(FieldKey.fromString("analysis_id"), model.getRowId(), CompareType.EQUAL);
258+
filter.addCondition(FieldKey.fromString("category"), "Tag-PCR", CompareType.EQUAL);
259+
filter.addCondition(FieldKey.fromString("container"), getPipelineCtx().getJob().getContainer().getId(), CompareType.EQUAL);
260+
TableSelector ts = new TableSelector(ti, PageFlowUtil.set("rowid"), filter, null);
261+
if (ts.exists())
262+
{
263+
getPipelineCtx().getLogger().info("Deleting existing QC metrics (probably from prior restarted job)");
264+
ts.getArrayList(Integer.class).forEach(rowid -> {
265+
Table.delete(ti, rowid);
266+
});
267+
}
268+
253269
Map<String, String> metricsMap = parseMetricFile(metrics);
254270
metricsMap.forEach((metricname, value) -> {
255271
Map<String, Object> r = new HashMap<>();

0 commit comments

Comments
 (0)