diff --git a/src/plumpy/process_comms.py b/src/plumpy/process_comms.py index 5049dad1..0a73729f 100644 --- a/src/plumpy/process_comms.py +++ b/src/plumpy/process_comms.py @@ -345,7 +345,9 @@ async def execute_process( class RemoteProcessThreadController: """ - A class that can be used to control and launch remote processes + A class that can be used to control and launch remote processes. + + Supports routing tasks to specific named queues via the queue_name parameter. """ def __init__(self, communicator: kiwipy.Communicator): @@ -425,10 +427,25 @@ def kill_all(self, msg_text: Optional[str]) -> None: self._communicator.broadcast_send(msg, subject=Intent.KILL) def continue_process( - self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False + self, + pid: 'PID_TYPE', + tag: Optional[str] = None, + nowait: bool = False, + no_reply: bool = False, + *, + queue_name: str, ) -> Union[None, PID_TYPE, ProcessResult]: + """Continue a process, routing to the specified queue. + + :param pid: the pid of the process to continue + :param tag: the checkpoint tag to continue from + :param nowait: if True don't wait for the process to complete + :param no_reply: if True, this call will be fire-and-forget + :param queue_name: queue name to route to (required) + :return: the result of continuing the process + """ message = create_continue_body(pid=pid, tag=tag, nowait=nowait) - return self.task_send(message, no_reply=no_reply) + return self.task_send(message, no_reply=no_reply, queue_name=queue_name) def launch_process( self, @@ -439,6 +456,8 @@ def launch_process( loader: Optional[loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False, + *, + queue_name: str, ) -> Union[None, PID_TYPE, ProcessResult]: """ Launch the process @@ -450,10 +469,11 @@ def launch_process( :param loader: the class loader to use :param nowait: if True only return when the process finishes :param no_reply: don't send a reply to the sender + :param queue_name: queue name to route to (required) :return: the pid of the created process or the outputs (if nowait=False) """ message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) - return self.task_send(message, no_reply=no_reply) + return self.task_send(message, no_reply=no_reply, queue_name=queue_name) def execute_process( self, @@ -463,6 +483,8 @@ def execute_process( loader: Optional[loaders.ObjectLoader] = None, nowait: bool = False, no_reply: bool = False, + *, + queue_name: str, ) -> Union[None, PID_TYPE, ProcessResult]: """ Execute a process. This call will first send a create task and then a continue task over @@ -475,6 +497,7 @@ def execute_process( :param loader: the class loader to use :param nowait: if True, don't wait for the process to send a response :param no_reply: if True, this call will be fire-and-forget, i.e. no return value + :param queue_name: queue name to route to (required) :return: the result of executing the process """ @@ -486,21 +509,23 @@ def execute_process( def on_created(_: Any) -> None: with kiwipy.capture_exceptions(execute_future): pid: 'PID_TYPE' = create_future.result() - continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply) + continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply, queue_name=queue_name) kiwipy.chain(continue_future, execute_future) create_future.add_done_callback(on_created) return execute_future - def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]: + def task_send(self, message: Any, no_reply: bool = False, *, queue_name: str) -> Optional[Any]: """ - Send a task to be performed using the communicator + Send a task to be performed using the communicator. :param message: the task message :param no_reply: if True, this call will be fire-and-forget, i.e. no return value + :param queue_name: queue name to route to (required) :return: the response from the remote side (if no_reply=False) """ - return self._communicator.task_send(message, no_reply=no_reply) + # Send directly to the queue - AiiDA's broker handles queue setup + return self._communicator.task_queue(queue_name).task_send(message, no_reply=no_reply) class ProcessLauncher: