Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/plumpy/process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,9 @@ async def execute_process(

class RemoteProcessThreadController:
"""
A class that can be used to control and launch remote processes
A class that can be used to control and launch remote processes.

Supports routing tasks to specific named queues via the queue_name parameter.
"""

def __init__(self, communicator: kiwipy.Communicator):
Expand Down Expand Up @@ -425,10 +427,25 @@ def kill_all(self, msg_text: Optional[str]) -> None:
self._communicator.broadcast_send(msg, subject=Intent.KILL)

def continue_process(
self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False
self,
pid: 'PID_TYPE',
tag: Optional[str] = None,
nowait: bool = False,
no_reply: bool = False,
*,
queue_name: str,
) -> Union[None, PID_TYPE, ProcessResult]:
"""Continue a process, routing to the specified queue.

:param pid: the pid of the process to continue
:param tag: the checkpoint tag to continue from
:param nowait: if True don't wait for the process to complete
:param no_reply: if True, this call will be fire-and-forget
:param queue_name: queue name to route to (required)
:return: the result of continuing the process
"""
message = create_continue_body(pid=pid, tag=tag, nowait=nowait)
return self.task_send(message, no_reply=no_reply)
return self.task_send(message, no_reply=no_reply, queue_name=queue_name)

def launch_process(
self,
Expand All @@ -439,6 +456,8 @@ def launch_process(
loader: Optional[loaders.ObjectLoader] = None,
nowait: bool = False,
no_reply: bool = False,
*,
queue_name: str,
) -> Union[None, PID_TYPE, ProcessResult]:
"""
Launch the process
Expand All @@ -450,10 +469,11 @@ def launch_process(
:param loader: the class loader to use
:param nowait: if True only return when the process finishes
:param no_reply: don't send a reply to the sender
:param queue_name: queue name to route to (required)
:return: the pid of the created process or the outputs (if nowait=False)
"""
message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait)
return self.task_send(message, no_reply=no_reply)
return self.task_send(message, no_reply=no_reply, queue_name=queue_name)

def execute_process(
self,
Expand All @@ -463,6 +483,8 @@ def execute_process(
loader: Optional[loaders.ObjectLoader] = None,
nowait: bool = False,
no_reply: bool = False,
*,
queue_name: str,
) -> Union[None, PID_TYPE, ProcessResult]:
"""
Execute a process. This call will first send a create task and then a continue task over
Expand All @@ -475,6 +497,7 @@ def execute_process(
:param loader: the class loader to use
:param nowait: if True, don't wait for the process to send a response
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
:param queue_name: queue name to route to (required)
:return: the result of executing the process
"""

Expand All @@ -486,21 +509,23 @@ def execute_process(
def on_created(_: Any) -> None:
with kiwipy.capture_exceptions(execute_future):
pid: 'PID_TYPE' = create_future.result()
continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply)
continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply, queue_name=queue_name)
kiwipy.chain(continue_future, execute_future)

create_future.add_done_callback(on_created)
return execute_future

def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]:
def task_send(self, message: Any, no_reply: bool = False, *, queue_name: str) -> Optional[Any]:
"""
Send a task to be performed using the communicator
Send a task to be performed using the communicator.

:param message: the task message
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
:param queue_name: queue name to route to (required)
:return: the response from the remote side (if no_reply=False)
"""
return self._communicator.task_send(message, no_reply=no_reply)
# Send directly to the queue - AiiDA's broker handles queue setup
return self._communicator.task_queue(queue_name).task_send(message, no_reply=no_reply)


class ProcessLauncher:
Expand Down
Loading