Skip to content

Use kiwipy to send early response for continue/launch/create without releasing task slot#338

Draft
agoscinski wants to merge 2 commits intoaiidateam:masterfrom
agoscinski:nowait
Draft

Use kiwipy to send early response for continue/launch/create without releasing task slot#338
agoscinski wants to merge 2 commits intoaiidateam:masterfrom
agoscinski:nowait

Conversation

@agoscinski
Copy link
Contributor

For the scheduler feature we want to be able to send responses like pause, play and kill to verify that the message has been received. We however cannot use the existing functionalities used in pause/play/kill because it would release the task slot. We therefore added a early response functionality into kiwipy to be able to send a response continue reply without releasing the task slot (that is required to enforce the daemon.worker_process_slots in the aiida config)

  • ProcessLauncher._launch and _continue now return TaskResult with task_id (PID) and result Future
  • kiwipy handles early reply (sends PID immediately when nowait=True) and waits for result Future before acking
  • Simplify ProcessLauncher.call to 2-arg signature (comm, task) since incoming_task is no longer needed
  • Update tests to handle TaskResult and use 2-arg subscriber signature

- ProcessLauncher._launch and _continue now return TaskResult with
  task_id (PID) and result Future
- kiwipy handles early reply (sends PID immediately when nowait=True)
  and waits for result Future before acking
- Simplify ProcessLauncher.__call__ to 2-arg signature (comm, task)
  since incoming_task is no longer needed
- Update tests to handle TaskResult and use 2-arg subscriber signature
# Start process in background, return TaskResult with Future
# kiwipy will handle early reply (if nowait) and wait for Future before acking
asyncio.ensure_future(proc.step_until_terminated()) # noqa: RUF006
return TaskResult(task_id=proc.pid, result=communications.plum_to_kiwi_future(proc.future()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is proc.future().result() actually again, is it always pid?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant