Skip to content

Commit 5a65bd0

Browse files
committed
Add sorting to lofreq indel table
1 parent b68c3b8 commit 5a65bd0

File tree

2 files changed

+62
-21
lines changed

2 files changed

+62
-21
lines changed

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/analysis/MergeLoFreqVcfHandler.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import htsjdk.variant.variantcontext.writer.VariantContextWriterBuilder;
1414
import htsjdk.variant.vcf.VCFFileReader;
1515
import htsjdk.variant.vcf.VCFHeader;
16+
import org.apache.commons.io.FileUtils;
1617
import org.apache.commons.lang3.StringUtils;
1718
import org.apache.commons.lang3.tuple.Pair;
1819
import org.apache.log4j.Logger;
@@ -29,9 +30,10 @@
2930
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
3031
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
3132
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
32-
import org.labkey.api.util.FileUtil;
33+
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
3334
import org.labkey.api.util.PageFlowUtil;
3435
import org.labkey.api.util.StringUtilsLabKey;
36+
import org.labkey.api.writer.PrintWriters;
3537
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
3638
import org.labkey.sequenceanalysis.run.variant.SNPEffStep;
3739
import org.labkey.sequenceanalysis.run.variant.SnpEffWrapper;
@@ -42,6 +44,7 @@
4244
import java.io.FileOutputStream;
4345
import java.io.IOException;
4446
import java.io.OutputStreamWriter;
47+
import java.io.PrintWriter;
4548
import java.nio.file.Files;
4649
import java.util.ArrayList;
4750
import java.util.Arrays;
@@ -232,7 +235,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
232235
File indelOutput = new File(ctx.getOutputDir(), basename + "indels.txt.gz");
233236
try (CSVWriter writer = new CSVWriter(new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(indelOutput)), StringUtilsLabKey.DEFAULT_CHARSET)), '\t', CSVWriter.NO_QUOTE_CHARACTER))
234237
{
235-
writer.writeNext(new String[]{"ReadsetName", "OutputFileId", "ReadsetId", "Source", "Contig", "Start", "End", "Ref", "AltAllele", "GatkDepth", "LoFreqDepth", "AltCount", "AltAF"});
238+
writer.writeNext(new String[]{"ReadsetName", "OutputFileId", "ReadsetId", "Source", "Contig", "Start", "End", "Length", "Ref", "AltAllele", "GatkDepth", "LoFreqDepth", "AltCount", "AltAF"});
236239

237240
for (SequenceOutputFile so : inputFiles)
238241
{
@@ -282,7 +285,8 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
282285
List<Integer> depths = vc.getAttributeAsIntList("DP4", 0);
283286
int alleleDepth = depths.get(2) + depths.get(3);
284287

285-
writer.writeNext(new String[]{ctx.getSequenceSupport().getCachedReadset(so.getReadset()).getName(), String.valueOf(so.getRowid()), String.valueOf(so.getReadset()), "LoFreq", vc.getContig(), String.valueOf(vc.getStart()), String.valueOf(vc.getEnd()), vc.getReference().getBaseString(), vc.getAlternateAlleles().get(0).getBaseString(), vc.getAttributeAsString("GATK_DP", "ND"), vc.getAttributeAsString("DP", "ND"), String.valueOf(alleleDepth), vc.getAttributeAsString("AF", "ND")});
288+
int length = vc.getLengthOnReference() - 1; //NOTE: indels are left-padded including one ref base
289+
writer.writeNext(new String[]{ctx.getSequenceSupport().getCachedReadset(so.getReadset()).getName(), String.valueOf(so.getRowid()), String.valueOf(so.getReadset()), "LoFreq", vc.getContig(), String.valueOf(vc.getStart()), String.valueOf(vc.getEnd()), String.valueOf(length), vc.getReference().getBaseString(), vc.getAlternateAlleles().get(0).getBaseString(), vc.getAttributeAsString("GATK_DP", "ND"), vc.getAttributeAsString("DP", "ND"), String.valueOf(alleleDepth), vc.getAttributeAsString("AF", "ND")});
286290
}
287291

288292
for (int i = 0; i < vc.getLengthOnReference(); i++)
@@ -320,13 +324,13 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
320324
String[] line;
321325
while ((line = reader.readNext()) != null)
322326
{
323-
writer.writeNext(new String[]{"Type", "Contig", "Start", "End", "Depth", "ReadSupport", "Fraction"});
324327
if (line[0].equals("Type"))
325328
{
326329
continue;
327330
}
328331

329-
writer.writeNext(new String[]{ctx.getSequenceSupport().getCachedReadset(so.getReadset()).getName(), String.valueOf(so.getRowid()), String.valueOf(so.getReadset()), "Pindel", line[1], line[2], line[3], "", line[0], line[4], "", line[5], line[6]});
332+
int length = Integer.parseInt(line[3]) - Integer.parseInt(line[2]) - 1; //NOTE: pindel reports one base upstream+downstream as part of the indel
333+
writer.writeNext(new String[]{ctx.getSequenceSupport().getCachedReadset(so.getReadset()).getName(), String.valueOf(so.getRowid()), String.valueOf(so.getReadset()), "Pindel", line[1], line[2], line[3], String.valueOf(length), "", line[0], line[4], "", line[5], line[6]});
330334
}
331335
}
332336
}
@@ -341,6 +345,8 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
341345
throw new PipelineJobException(e);
342346
}
343347

348+
sortTsvFile(ctx, indelOutput);
349+
344350
if (!errors.isEmpty())
345351
{
346352
errors.forEach(ctx.getLogger()::error);
@@ -694,6 +700,41 @@ private int getReadDepth(File vcf, Map<String, Integer> contigToOffset, String c
694700
}
695701
}
696702

703+
private void sortTsvFile(JobContext ctx, File input) throws PipelineJobException
704+
{
705+
File output = new File(input.getPath() + ".tmp");
706+
File script = new File(input.getParentFile(), "script.sh");
707+
try (PrintWriter writer = PrintWriters.getPrintWriter(script))
708+
{
709+
writer.println("#!/bin/bash");
710+
writer.println("set -x");
711+
writer.println("set -e");
712+
writer.println("{");
713+
writer.println("zcat " + input.getPath() +" | head -n 1;");
714+
writer.println("zcat " + input.getPath() +" | tail +2 | sort -k5,5 -k6,6n -k7,7n;");
715+
writer.println("} | gzip -c > " + output.getPath() + "\n");
716+
}
717+
catch (IOException e)
718+
{
719+
throw new PipelineJobException(e);
720+
}
721+
722+
SimpleScriptWrapper wrapper = new SimpleScriptWrapper(ctx.getLogger());
723+
wrapper.execute(Arrays.asList("/bin/bash", "script.sh"));
724+
725+
input.delete();
726+
script.delete();
727+
728+
try
729+
{
730+
FileUtils.moveFile(output, input);
731+
}
732+
catch (IOException e)
733+
{
734+
throw new PipelineJobException(e);
735+
}
736+
}
737+
697738
private Map<File, VCFFileReader> readerMap = new HashMap<>();
698739

699740
private VCFFileReader getReader(File f)

cluster/src/org/labkey/cluster/pipeline/AbstractClusterExecutionEngine.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -474,27 +474,33 @@ protected void updateJobStatus(@Nullable String status, ClusterJob j, @Nullable
474474
j.setLastStatusCheck(new Date());
475475
j.setStatus(status);
476476

477-
//no need to redundantly update PipelineJob
478-
if (!statusChanged)
477+
PipelineStatusFile sf = PipelineService.get().getStatusFile(j.getStatusFileId());
478+
if (sf != null)
479479
{
480-
PipelineStatusFile sf = PipelineService.get().getStatusFile(j.getStatusFileId());
481-
if (sf != null)
480+
File log = new File(sf.getFilePath());
481+
if (log.exists())
482482
{
483-
File log = new File(sf.getFilePath());
484-
if (log.exists())
485-
{
486-
j.setLogModified(new Date(log.lastModified()));
487-
}
483+
j.setLogModified(new Date(log.lastModified()));
488484
}
489485

486+
//NOTE: in rare cases the actual job and status file can get out of sync
487+
if (status.equalsIgnoreCase(PipelineJob.TaskStatus.running.name()) && sf.getStatus().equalsIgnoreCase(PipelineJob.TaskStatus.error.name()))
488+
{
489+
_log.error("Pipeline job and cluster out of sync: " + sf.getStatus() + " / " + status + ", for job: " + sf.getRowId());
490+
statusChanged = true;
491+
}
492+
}
493+
494+
//no need to redundantly update PipelineJob
495+
if (!statusChanged)
496+
{
490497
Table.update(null, ClusterSchema.getInstance().getSchema().getTable(ClusterSchema.CLUSTER_JOBS), j, j.getRowId());
491498
return;
492499
}
493500

494501
//and update the actual PipelineJob
495502
try
496503
{
497-
PipelineStatusFile sf = PipelineService.get().getStatusFile(j.getStatusFileId());
498504
PipelineJob pj = null;
499505
if (sf != null && status != null)
500506
{
@@ -519,12 +525,6 @@ protected void updateJobStatus(@Nullable String status, ClusterJob j, @Nullable
519525
return;
520526
}
521527

522-
File log = new File(sf.getFilePath());
523-
if (log.exists())
524-
{
525-
j.setLogModified(new Date(log.lastModified()));
526-
}
527-
528528
PipelineJob.TaskStatus taskStatus = null;
529529
for (PipelineJob.TaskStatus ts : PipelineJob.TaskStatus.values())
530530
{

0 commit comments

Comments
 (0)