diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 6cc493014..646bea8cf 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -9,6 +9,7 @@ import sys import importlib.util import json +import queue class CommandExecutor: """ @@ -43,12 +44,21 @@ def run_multiple_commands( self.logger.log(f"Running {len(commands)} commands in parallel...", 1) start_time = time.time() + # Queue to capture exceptions from threads + exceptions_queue = queue.Queue() + + def run_command_thread_target(cmd): + try: + self.run_command(cmd) + except Exception as e: + exceptions_queue.put(e) + # Initialize a list to keep track of threads threads = [] # Start a new thread for each command for cmd in commands: - thread = threading.Thread(target=self.run_command, args=(cmd,)) + thread = threading.Thread(target=run_command_thread_target, args=(cmd,)) thread.start() threads.append(thread) @@ -60,6 +70,11 @@ def run_multiple_commands( end_time = time.time() self.logger.log(f"Total time to run {len(commands)} commands: {end_time - start_time:.2f} seconds", 1) + # Exceptions are logged once all threads have completed execution + # The first exception amongst all is logged and raised + if not exceptions_queue.empty(): + raise RuntimeError(exceptions_queue.get()) + def run_command(self, command: list[str]) -> None: """ Executes a specified shell command and logs its execution details. @@ -105,6 +120,7 @@ def run_command(self, command: list[str]) -> None: if stderr or process.returncode != 0: error_message = stderr.decode().strip() self.logger.log(f"ERRORS OCCURRED:\n{error_message}", 2) + raise RuntimeError(error_message) def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> None: """