9595import java .util .LinkedHashSet ;
9696import java .util .List ;
9797import java .util .Map ;
98+ import java .util .Optional ;
9899import java .util .Set ;
100+ import java .util .stream .Collectors ;
99101
100102/**
101103 * This task is designed to act on either a FASTQ or zipped FASTQ file. Each file should already have been imported as a readset using
@@ -1073,7 +1075,8 @@ private File doAlignment(ReferenceGenome referenceGenome, Readset rs, Map<ReadDa
10731075
10741076 private File doMergeThenAlign (ReferenceGenome referenceGenome , Readset rs , Map <ReadData , Pair <File , File >> files , List <RecordedAction > alignActions ) throws PipelineJobException , IOException
10751077 {
1076- if (files .size () > 1 )
1078+ AlignmentStep alignmentStep = getHelper ().getSingleStep (AlignmentStep .class ).create (getHelper ());
1079+ if (!alignmentStep .canAlignMultiplePairsAtOnce () && files .size () > 1 )
10771080 {
10781081 getJob ().setStatus (PipelineJob .TaskStatus .running , "Merging FASTQs" );
10791082 FastqMerger merger = new FastqMerger (getJob ().getLogger ());
@@ -1152,13 +1155,14 @@ private File doMergeThenAlign(ReferenceGenome referenceGenome, Readset rs, Map<R
11521155 FileUtils .touch (doneFile );
11531156 getHelper ().getFileManager ().addIntermediateFile (doneFile );
11541157
1155- return doAlignmentForPair ( Pair .of (mergedForward , mergedReverse ), referenceGenome , rs , -1 , "" , null );
1158+ return doAlignmentForSet ( Arrays . asList ( Pair .of (mergedForward , mergedReverse ) ), referenceGenome , rs , -1 , "" , null );
11561159 }
11571160 else
11581161 {
11591162 getJob ().getLogger ().info ("No FASTQ merge necessary" );
1160- ReadData rd = files .keySet ().iterator ().next ();
1161- return doAlignmentForPair (files .get (rd ), referenceGenome , rs , rd .getRowid (), "" , rd .getPlatformUnit ());
1163+ List <Pair <File , File >> inputs = new ArrayList <>(files .values ());
1164+ Optional <Integer > minReadData = files .keySet ().stream ().map (ReadData ::getRowid ).min (Integer ::compareTo );
1165+ return doAlignmentForSet (inputs , referenceGenome , rs , minReadData .get (), "" , inputs .size () == 1 ? files .keySet ().stream ().iterator ().next ().getPlatformUnit () : null );
11621166 }
11631167 }
11641168
@@ -1175,7 +1179,7 @@ private File doAlignThenMerge(ReferenceGenome referenceGenome, Readset rs, Map<R
11751179
11761180 getJob ().getLogger ().info ("Aligning inputs: " + pair .first .getName () + (pair .second == null ? "" : " and " + pair .second .getName ()));
11771181
1178- alignOutputs .add (doAlignmentForPair ( pair , referenceGenome , rs , rd .getRowid (), msgSuffix , rd .getPlatformUnit ()));
1182+ alignOutputs .add (doAlignmentForSet ( Arrays . asList ( pair ) , referenceGenome , rs , rd .getRowid (), msgSuffix , rd .getPlatformUnit ()));
11791183 }
11801184
11811185 //merge outputs
@@ -1199,7 +1203,7 @@ private File doAlignThenMerge(ReferenceGenome referenceGenome, Readset rs, Map<R
11991203 bam = new File (alignOutputs .get (0 ).getParent (), FileUtil .getBaseName (alignOutputs .get (0 ).getName ()) + ".merged.bam" );
12001204 getHelper ().getFileManager ().addOutput (mergeAction , "Merged BAM" , bam );
12011205 //NOTE: merged BAMs will be deleted as intermediate files, and if we delete too early this breaks job resume
1202- mergeSamFilesWrapper .execute (bams , bam . getPath () , false );
1206+ mergeSamFilesWrapper .execute (bams , bam , false );
12031207 getHelper ().getFileManager ().addCommandsToAction (mergeSamFilesWrapper .getCommandsExecuted (), mergeAction );
12041208
12051209 Date end = new Date ();
@@ -1215,35 +1219,47 @@ private File doAlignThenMerge(ReferenceGenome referenceGenome, Readset rs, Map<R
12151219 return bam ;
12161220 }
12171221
1218- public File doAlignmentForPair ( Pair <File , File > inputFiles , ReferenceGenome referenceGenome , Readset rs , int readDataId , @ NotNull String msgSuffix , @ Nullable String platformUnit ) throws PipelineJobException , IOException
1222+ public File doAlignmentForSet ( List < Pair <File , File >> inputFiles , ReferenceGenome referenceGenome , Readset rs , int lowestReadDataId , @ NotNull String msgSuffix , @ Nullable String platformUnit ) throws PipelineJobException , IOException
12191223 {
1220- getJob (). getLogger (). info ( "Beginning alignment for: " + inputFiles . first . getName () + ( inputFiles . second == null ? "" : " and " + inputFiles . second . getName ()) + msgSuffix );
1224+ AlignmentStep alignmentStep = getHelper (). getSingleStep ( AlignmentStep . class ). create ( getHelper () );
12211225
1222- if (_resumer .isReadDataAlignmentDone (readDataId ))
1226+ getJob ().getLogger ().info ("Beginning alignment for: " );
1227+ inputFiles .forEach (x -> getPipelineJob ().getLogger ().info (x .first .getName () + (x .second == null ? "" : " and " + x .second .getName ()) + msgSuffix ));
1228+
1229+ if (_resumer .isReadDataAlignmentDone (lowestReadDataId ))
12231230 {
1224- getJob ().getLogger ().debug ("resuming alignment of readData from saved state: " + readDataId );
1225- return _resumer .getBamForReadData (readDataId );
1231+ getJob ().getLogger ().debug ("resuming alignment of readData from saved state: " + lowestReadDataId );
1232+ return _resumer .getBamForReadData (lowestReadDataId );
12261233 }
12271234 else
12281235 {
12291236 List <RecordedAction > actions = new ArrayList <>();
12301237
12311238 //log input sequence count
1232- FastqUtils .logSequenceCounts (inputFiles .first , inputFiles .second , getJob ().getLogger (), null , null );
1239+ for (Pair <File , File > x : inputFiles )
1240+ {
1241+ FastqUtils .logSequenceCounts (x .first , x .second , getJob ().getLogger (), null , null );
1242+ }
12331243
1234- AlignmentStep alignmentStep = getHelper ().getSingleStep (AlignmentStep .class ).create (getHelper ());
12351244 FileType gz = new FileType (".gz" );
1236- if (!alignmentStep .supportsGzipFastqs () && gz .isType (inputFiles .first ))
1245+ boolean performedDecompress = false ;
1246+ for (Pair <File , File > pair : inputFiles )
12371247 {
1238- getJob ().setStatus (PipelineJob .TaskStatus .running , "DECOMPRESS INPUT FILES" );
1239- getHelper ().getFileManager ().decompressInputFiles (inputFiles , actions );
1240- getHelper ().getFileManager ().addIntermediateFile (inputFiles .first );
1241- if (inputFiles .second != null )
1248+ if (!alignmentStep .supportsGzipFastqs () && gz .isType (pair .first ))
12421249 {
1243- getHelper ().getFileManager ().addIntermediateFile (inputFiles .second );
1250+ getJob ().setStatus (PipelineJob .TaskStatus .running , "DECOMPRESS INPUT FILES" );
1251+ getHelper ().getFileManager ().decompressInputFiles (pair , actions );
1252+ getHelper ().getFileManager ().addIntermediateFile (pair .first );
1253+ if (pair .second != null )
1254+ {
1255+ getHelper ().getFileManager ().addIntermediateFile (pair .second );
1256+ }
1257+
1258+ performedDecompress = true ;
12441259 }
12451260 }
1246- else
1261+
1262+ if (!performedDecompress )
12471263 {
12481264 getJob ().getLogger ().debug ("no need to decompress input files, skipping" );
12491265 }
@@ -1252,16 +1268,19 @@ public File doAlignmentForPair(Pair<File, File> inputFiles, ReferenceGenome refe
12521268
12531269 Date start = new Date ();
12541270 alignmentAction .setStartTime (start );
1255- getHelper ().getFileManager ().addInput (alignmentAction , SequenceTaskHelper .FASTQ_DATA_INPUT_NAME , inputFiles .first );
1256-
1257- if (inputFiles .second != null )
1271+ for (Pair <File , File > pair : inputFiles )
12581272 {
1259- getHelper ().getFileManager ().addInput (alignmentAction , SequenceTaskHelper .FASTQ_DATA_INPUT_NAME , inputFiles .second );
1273+ getHelper ().getFileManager ().addInput (alignmentAction , SequenceTaskHelper .FASTQ_DATA_INPUT_NAME , pair .first );
1274+
1275+ if (pair .second != null )
1276+ {
1277+ getHelper ().getFileManager ().addInput (alignmentAction , SequenceTaskHelper .FASTQ_DATA_INPUT_NAME , pair .second );
1278+ }
12601279 }
12611280
12621281 getHelper ().getFileManager ().addInput (alignmentAction , IndexOutputImpl .REFERENCE_DB_FASTA , referenceGenome .getSourceFastaFile ());
12631282
1264- File outputDirectory = new File (getHelper ().getWorkingDirectory (), SequenceTaskHelper .getMinimalBaseName (inputFiles .first .getName ()));
1283+ File outputDirectory = new File (getHelper ().getWorkingDirectory (), SequenceTaskHelper .getMinimalBaseName (inputFiles .get ( 0 ). first .getName ()));
12651284 outputDirectory = new File (outputDirectory , ALIGNMENT_SUBFOLDER_NAME );
12661285 if (!outputDirectory .exists ())
12671286 {
@@ -1270,7 +1289,10 @@ public File doAlignmentForPair(Pair<File, File> inputFiles, ReferenceGenome refe
12701289 }
12711290
12721291 getJob ().setStatus (PipelineJob .TaskStatus .running , "RUNNING: " + alignmentStep .getProvider ().getLabel ().toUpperCase () + msgSuffix );
1273- AlignmentStep .AlignmentOutput alignmentOutput = alignmentStep .performAlignment (rs , inputFiles .first , inputFiles .second , outputDirectory , referenceGenome , SequenceTaskHelper .getUnzippedBaseName (inputFiles .first .getName ()) + "." + alignmentStep .getProvider ().getName ().toLowerCase (), String .valueOf (readDataId ), platformUnit );
1292+ List <File > forwardFastqs = inputFiles .stream ().map (Pair ::getKey ).collect (Collectors .toList ());
1293+ List <File > reverseFastqs = inputFiles .get (0 ).getValue () == null ? null : inputFiles .stream ().map (Pair ::getKey ).collect (Collectors .toList ());
1294+
1295+ AlignmentStep .AlignmentOutput alignmentOutput = alignmentStep .performAlignment (rs , forwardFastqs , reverseFastqs , outputDirectory , referenceGenome , SequenceTaskHelper .getUnzippedBaseName (inputFiles .get (0 ).first .getName ()) + "." + alignmentStep .getProvider ().getName ().toLowerCase (), String .valueOf (lowestReadDataId ), platformUnit );
12741296 getHelper ().getFileManager ().addStepOutputs (alignmentAction , alignmentOutput );
12751297
12761298 if (alignmentOutput .getBAM () == null || !alignmentOutput .getBAM ().exists ())
@@ -1285,7 +1307,7 @@ public File doAlignmentForPair(Pair<File, File> inputFiles, ReferenceGenome refe
12851307 SequenceUtil .logFastqBamDifferences (getJob ().getLogger (), alignmentOutput .getBAM ());
12861308
12871309 ToolParameterDescriptor mergeParam = alignmentStep .getProvider ().getParameterByName (AbstractAlignmentStepProvider .SUPPORT_MERGED_UNALIGNED );
1288- boolean doMergeUnaligned = mergeParam == null ? false : mergeParam .extractValue (getJob (), alignmentStep .getProvider (), alignmentStep .getStepIdx (), Boolean .class , false );
1310+ boolean doMergeUnaligned = mergeParam != null && mergeParam .extractValue (getJob (), alignmentStep .getProvider (), alignmentStep .getStepIdx (), Boolean .class , false );
12891311 if (doMergeUnaligned )
12901312 {
12911313 getJob ().setStatus (PipelineJob .TaskStatus .running , "MERGING UNALIGNED READS INTO BAM" + msgSuffix );
@@ -1299,7 +1321,7 @@ public File doAlignmentForPair(Pair<File, File> inputFiles, ReferenceGenome refe
12991321
13001322 //merge unaligned reads and clean file
13011323 MergeBamAlignmentWrapper wrapper = new MergeBamAlignmentWrapper (getJob ().getLogger ());
1302- wrapper .executeCommand (referenceGenome .getWorkingFastaFile (), alignmentOutput .getBAM (), inputFiles . first , inputFiles . second , null );
1324+ wrapper .executeCommand (referenceGenome .getWorkingFastaFile (), alignmentOutput .getBAM (), inputFiles , null );
13031325 getHelper ().getFileManager ().addCommandsToAction (wrapper .getCommandsExecuted (), alignmentAction );
13041326 }
13051327 else
@@ -1325,7 +1347,7 @@ public File doAlignmentForPair(Pair<File, File> inputFiles, ReferenceGenome refe
13251347 runner .execute (alignmentOutput .getBAM ());
13261348 getHelper ().getFileManager ().addCommandsToAction (runner .getCommandsExecuted (), alignmentAction );
13271349
1328- _resumer .setReadDataAlignmentDone (readDataId , actions , alignmentOutput .getBAM ());
1350+ _resumer .setReadDataAlignmentDone (lowestReadDataId , actions , alignmentOutput .getBAM ());
13291351
13301352 return alignmentOutput .getBAM ();
13311353 }
0 commit comments