22
33import org .apache .logging .log4j .Logger ;
44import org .jetbrains .annotations .NotNull ;
5+ import org .labkey .api .data .ContainerManager ;
6+ import org .labkey .api .data .DbSequence ;
7+ import org .labkey .api .data .DbSequenceManager ;
58import org .labkey .api .exp .XarFormatException ;
69import org .labkey .api .pipeline .AbstractTaskFactory ;
710import org .labkey .api .pipeline .AbstractTaskFactorySettings ;
@@ -37,6 +40,8 @@ public class NextFlowRunTask extends WorkDirectoryTask<NextFlowRunTask.Factory>
3740
3841 public static final String ACTION_NAME = "NextFlow" ;
3942
43+ private static final DbSequence INVOCATION_SEQUENCE = DbSequenceManager .get (ContainerManager .getRoot (), NextFlowRunTask .class .getName ());
44+
4045 public NextFlowRunTask (Factory factory , PipelineJob job )
4146 {
4247 super (factory , job );
@@ -46,7 +51,12 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
4651 public @ NotNull RecordedActionSet run () throws PipelineJobException
4752 {
4853 Logger log = getJob ().getLogger ();
49- NextFlowPipelineJob .LOG .info ("Starting to execute NextFlow: {}" , getJob ().getJsonJobInfo ());
54+
55+ // NextFlow requires a unique job name for every execution. Increment a counter to append as a suffix to
56+ // ensure uniqueness
57+ long invocationCount = INVOCATION_SEQUENCE .next ();
58+ INVOCATION_SEQUENCE .sync ();
59+ NextFlowPipelineJob .LOG .info ("Starting to execute NextFlow: {}" , getJob ().getJsonJobInfo (invocationCount ));
5060
5161 SecurityManager .TransformSession session = null ;
5262 boolean success = false ;
@@ -73,10 +83,10 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
7383 File dir = getJob ().getLogFile ().getParentFile ();
7484 getJob ().runSubProcess (secretsPB , dir );
7585
76- ProcessBuilder executionPB = new ProcessBuilder (getArgs ());
86+ ProcessBuilder executionPB = new ProcessBuilder (getArgs (invocationCount ));
7787 getJob ().runSubProcess (executionPB , dir );
7888 log .info ("Job Finished" );
79- NextFlowPipelineJob .LOG .info ("Finished executing NextFlow: {}" , getJob ().getJsonJobInfo ());
89+ NextFlowPipelineJob .LOG .info ("Finished executing NextFlow: {}" , getJob ().getJsonJobInfo (invocationCount ));
8090
8191 RecordedAction action = new RecordedAction (ACTION_NAME );
8292 for (Path inputFile : getJob ().getInputFilePaths ())
@@ -100,14 +110,16 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
100110 }
101111 if (!success )
102112 {
103- NextFlowPipelineJob .LOG .info ("Failed executing NextFlow: {}" , getJob ().getJsonJobInfo ());
113+ NextFlowPipelineJob .LOG .info ("Failed executing NextFlow: {}" , getJob ().getJsonJobInfo (invocationCount ));
104114 }
105115 }
106116 }
107117
108118 private void addOutputs (RecordedAction action , Path path , Logger log ) throws IOException
109119 {
110- if (Files .isRegularFile (path ))
120+ // Skip results.sky.zip files - it's the template document. We want the file output doc that includes
121+ // the replicate analysis
122+ if (Files .isRegularFile (path ) && !path .endsWith ("results.sky.zip" ))
111123 {
112124 action .addOutput (path .toFile (), "Output" , false );
113125 if (path .toString ().toLowerCase ().endsWith (".sky.zip" ))
@@ -164,7 +176,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException
164176 }
165177
166178
167- private @ NotNull List <String > getArgs () throws PipelineJobException
179+ private @ NotNull List <String > getArgs (long invocationCount ) throws PipelineJobException
168180 {
169181 NextFlowConfiguration config = NextFlowManager .get ().getConfiguration ();
170182 Path configFile = getJob ().getConfig ();
@@ -189,7 +201,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException
189201 args .add ("-c" );
190202 args .add (configFile .toAbsolutePath ().toString ());
191203 args .add ("-name" );
192- args .add (getJob ().getNextFlowRunName ());
204+ args .add (getJob ().getNextFlowRunName (invocationCount ));
193205 return args ;
194206 }
195207
0 commit comments