From a4652720e55e209fad29d19c73e433bdaecb610f Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Fri, 27 Feb 2026 16:16:55 +0100 Subject: [PATCH 1/2] Use kiwipy TaskResult for process launch/continue - ProcessLauncher._launch and _continue now return TaskResult with task_id (PID) and result Future - kiwipy handles early reply (sends PID immediately when nowait=True) and waits for result Future before acking - Simplify ProcessLauncher.__call__ to 2-arg signature (comm, task) since incoming_task is no longer needed - Update tests to handle TaskResult and use 2-arg subscriber signature --- pyproject.toml | 2 +- src/plumpy/communications.py | 4 +-- src/plumpy/process_comms.py | 63 +++++++++++++++++++----------------- tests/test_process_comms.py | 10 ++++-- 4 files changed, 45 insertions(+), 34 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 21d506f1..5088bc4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ requires-python = '>=3.9' dependencies = [ "greenback~=1.0", "greenlet>=3.2.4", # We need to expicitly set our lowest bond, unfortunatly greenback does not pin the exact version - "kiwipy[rmq]~=0.9.0", + "kiwipy", "pyyaml~=6.0", ] [project.urls] diff --git a/src/plumpy/communications.py b/src/plumpy/communications.py index 25788212..1954848d 100644 --- a/src/plumpy/communications.py +++ b/src/plumpy/communications.py @@ -164,8 +164,8 @@ def add_broadcast_subscriber( def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None: return self._communicator.remove_broadcast_subscriber(identifier) - def task_send(self, task: Any, no_reply: bool = False) -> kiwipy.Future: - return self._communicator.task_send(task, no_reply) + def task_send(self, task: Any, no_reply: bool = False, nowait: bool = False) -> kiwipy.Future: + return self._communicator.task_send(task, no_reply, nowait) def rpc_send(self, recipient_id: 'ID_TYPE', msg: Any) -> kiwipy.Future: return self._communicator.rpc_send(recipient_id, msg) diff --git a/src/plumpy/process_comms.py b/src/plumpy/process_comms.py index 5049dad1..c90c5c79 100644 --- a/src/plumpy/process_comms.py +++ b/src/plumpy/process_comms.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast import kiwipy +from kiwipy.rmq import TaskResult from . import communications, futures, loaders, persistence from .utils import PID_TYPE @@ -17,6 +18,7 @@ 'ProcessLauncher', 'RemoteProcessController', 'RemoteProcessThreadController', + 'TaskResult', 'create_continue_body', 'create_launch_body', ] @@ -260,7 +262,7 @@ async def continue_process( """ message = create_continue_body(pid=pid, tag=tag, nowait=nowait) # Wait for the communication to go through - continue_future = self._communicator.task_send(message, no_reply=no_reply) + continue_future = self._communicator.task_send(message, no_reply=no_reply, nowait=nowait) future = await asyncio.wrap_future(continue_future) if no_reply: @@ -294,7 +296,7 @@ async def launch_process( """ message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) - launch_future = self._communicator.task_send(message, no_reply=no_reply) + launch_future = self._communicator.task_send(message, no_reply=no_reply, nowait=nowait) future = await asyncio.wrap_future(launch_future) if no_reply: @@ -333,7 +335,7 @@ async def execute_process( pid: 'PID_TYPE' = await asyncio.wrap_future(future) message = create_continue_body(pid, nowait=nowait) - continue_future = self._communicator.task_send(message, no_reply=no_reply) + continue_future = self._communicator.task_send(message, no_reply=no_reply, nowait=nowait) future = await asyncio.wrap_future(continue_future) if no_reply: @@ -428,7 +430,7 @@ def continue_process( self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False ) -> Union[None, PID_TYPE, ProcessResult]: message = create_continue_body(pid=pid, tag=tag, nowait=nowait) - return self.task_send(message, no_reply=no_reply) + return self.task_send(message, no_reply=no_reply, nowait=nowait) def launch_process( self, @@ -453,7 +455,7 @@ def launch_process( :return: the pid of the created process or the outputs (if nowait=False) """ message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) - return self.task_send(message, no_reply=no_reply) + return self.task_send(message, no_reply=no_reply, nowait=nowait) def execute_process( self, @@ -492,15 +494,16 @@ def on_created(_: Any) -> None: create_future.add_done_callback(on_created) return execute_future - def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]: + def task_send(self, message: Any, no_reply: bool = False, nowait: bool = False) -> Optional[Any]: """ Send a task to be performed using the communicator :param message: the task message :param no_reply: if True, this call will be fire-and-forget, i.e. no return value + :param nowait: if True, return immediately with task_id instead of waiting for result :return: the response from the remote side (if no_reply=False) """ - return self._communicator.task_send(message, no_reply=no_reply) + return self._communicator.task_send(message, no_reply=no_reply, nowait=nowait) class ProcessLauncher: @@ -545,10 +548,15 @@ def __init__( else: self._loader = loaders.get_object_loader() - async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any]) -> Union[PID_TYPE, ProcessResult]: + async def __call__( + self, communicator: kiwipy.Communicator, task: Dict[str, Any] + ) -> Union[TaskResult, 'PID_TYPE']: """ Receive a task. - :param task: The task message + + :param communicator: the communicator + :param task: the task message + :return: TaskResult for launch/continue tasks, PID for create tasks """ task_type = task[TASK_KEY] if task_type == LAUNCH_TASK: @@ -568,7 +576,7 @@ async def _launch( nowait: bool, init_args: Optional[Sequence[Any]] = None, init_kwargs: Optional[Dict[str, Any]] = None, - ) -> Union[PID_TYPE, ProcessResult]: + ) -> TaskResult: """ Launch the process @@ -578,7 +586,7 @@ async def _launch( :param nowait: if True only return when the process finishes :param init_args: positional arguments to the process constructor :param init_kwargs: keyword arguments to the process constructor - :return: the pid of the created process or the outputs (if nowait=False) + :return: TaskResult with task_id (PID) and result Future """ if persist and not self._persister: raise communications.TaskRejected('Cannot persist process, no persister') @@ -593,18 +601,18 @@ async def _launch( if persist and self._persister is not None: self._persister.save_checkpoint(proc) - if nowait: - # XXX: can return a reference and gracefully use task to cancel itself when the upper call stack fails - asyncio.ensure_future(proc.step_until_terminated()) # noqa: RUF006 - return proc.pid - - await proc.step_until_terminated() - - return proc.future().result() + # Start process in background, return TaskResult with Future + # kiwipy will handle early reply (if nowait) and wait for Future before acking + asyncio.ensure_future(proc.step_until_terminated()) # noqa: RUF006 + return TaskResult(task_id=proc.pid, result=communications.plum_to_kiwi_future(proc.future())) async def _continue( - self, _communicator: kiwipy.Communicator, pid: 'PID_TYPE', nowait: bool, tag: Optional[str] = None - ) -> Union[PID_TYPE, ProcessResult]: + self, + _communicator: kiwipy.Communicator, + pid: 'PID_TYPE', + nowait: bool, + tag: Optional[str] = None, + ) -> TaskResult: """ Continue the process @@ -612,6 +620,7 @@ async def _continue( :param pid: the pid of the process to continue :param nowait: if True don't wait for the process to complete :param tag: the checkpoint tag to continue from + :return: TaskResult with task_id (PID) and result Future """ if not self._persister: LOGGER.warning('rejecting task: cannot continue process<%d> because no persister is available', pid) @@ -621,14 +630,10 @@ async def _continue( saved_state = self._persister.load_checkpoint(pid, tag) proc = cast('Process', saved_state.unbundle(self._load_context)) - if nowait: - # XXX: can return a reference and gracefully use task to cancel itself when the upper call stack fails - asyncio.ensure_future(proc.step_until_terminated()) # noqa: RUF006 - return proc.pid - - await proc.step_until_terminated() - - return proc.future().result() + # Start process in background, return TaskResult with Future + # kiwipy will handle early reply (if nowait) and wait for Future before acking + asyncio.ensure_future(proc.step_until_terminated()) # noqa: RUF006 + return TaskResult(task_id=proc.pid, result=communications.plum_to_kiwi_future(proc.future())) async def _create( self, diff --git a/tests/test_process_comms.py b/tests/test_process_comms.py index 44947230..c999999e 100644 --- a/tests/test_process_comms.py +++ b/tests/test_process_comms.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import asyncio + import pytest import plumpy @@ -37,7 +39,9 @@ async def test_continue(): del process process = None - result = await launcher._continue(None, **plumpy.create_continue_body(pid)[process_comms.TASK_ARGS]) + task_result = await launcher._continue(None, **plumpy.create_continue_body(pid)[process_comms.TASK_ARGS]) + # _continue returns a TaskResult; wait for the result Future to resolve + result = await asyncio.wrap_future(task_result.result) assert result == utils.DummyProcess.EXPECTED_OUTPUTS @@ -51,5 +55,7 @@ async def test_loader_is_used(): launcher = plumpy.ProcessLauncher(persister=persister, loader=loader) continue_task = plumpy.create_continue_body(proc.pid) - result = await launcher._continue(None, **continue_task[process_comms.TASK_ARGS]) + task_result = await launcher._continue(None, **continue_task[process_comms.TASK_ARGS]) + # _continue returns a TaskResult; wait for the result Future to resolve + result = await asyncio.wrap_future(task_result.result) assert result == utils.DummyProcess.EXPECTED_OUTPUTS From 8ba64e0e99cf75c3b95633e2399c715e92983ec1 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 3 Mar 2026 15:52:57 +0100 Subject: [PATCH 2/2] Add tmp kiwipy dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5088bc4a..36ec29a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ requires-python = '>=3.9' dependencies = [ "greenback~=1.0", "greenlet>=3.2.4", # We need to expicitly set our lowest bond, unfortunatly greenback does not pin the exact version - "kiwipy", + "kiwipy @ git+https://github.com/aiidateam/kiwipy@nowait", "pyyaml~=6.0", ] [project.urls]