Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/workflow/CommandExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import importlib.util
import json
import queue

class CommandExecutor:
"""
Expand Down Expand Up @@ -43,12 +44,21 @@ def run_multiple_commands(
self.logger.log(f"Running {len(commands)} commands in parallel...", 1)
start_time = time.time()

# Queue to capture exceptions from threads
exceptions_queue = queue.Queue()

def run_command_thread_target(cmd):
try:
self.run_command(cmd)
except Exception as e:
exceptions_queue.put(e)

# Initialize a list to keep track of threads
threads = []

# Start a new thread for each command
for cmd in commands:
thread = threading.Thread(target=self.run_command, args=(cmd,))
thread = threading.Thread(target=run_command_thread_target, args=(cmd,))
thread.start()
threads.append(thread)

Expand All @@ -60,6 +70,11 @@ def run_multiple_commands(
end_time = time.time()
self.logger.log(f"Total time to run {len(commands)} commands: {end_time - start_time:.2f} seconds", 1)

# Exceptions are logged once all threads have completed execution
# The first exception amongst all is logged and raised
if not exceptions_queue.empty():
raise RuntimeError(exceptions_queue.get())

Comment on lines +73 to +77
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@t0mdavid-m what I've done here is that as the threads execute .... I catch the threads and then keep array of exceptions and as soon as all threads complete I throw/raise the first occurred exception to terminate the workflow. I've tested it out and it works. Only thing is that the termination happens after all threads complete.

Wouldn't it be safe to not kill other threads and simply let them run but in the minimal logs just show the error that was hit first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can kill all threads as soon as an exception was raised. The workflow will not be able to complete in any case after that so there is no reason to keep threads running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should I use Thread.Event to listen to a STOP event across all threads in a while loop within each thread run and terminate/kill the thread as soon as the event is set to true

def run_command(self, command: list[str]) -> None:
"""
Executes a specified shell command and logs its execution details.
Expand Down Expand Up @@ -105,6 +120,7 @@ def run_command(self, command: list[str]) -> None:
if stderr or process.returncode != 0:
error_message = stderr.decode().strip()
self.logger.log(f"ERRORS OCCURRED:\n{error_message}", 2)
raise RuntimeError(error_message)

def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> None:
"""
Expand Down
Loading