From b25b6f3058576befee615ee54bfd4c3f9a438729 Mon Sep 17 00:00:00 2001 From: Mayank Jha Date: Wed, 28 Jan 2026 11:31:30 -0800 Subject: [PATCH] feat: add wait_for_triggers option to poll instead of suspend Add a new execution option `wait_for_triggers` that allows the runtime to poll triggers until completion instead of suspending and returning. This keeps the process running and automatically resumes execution when triggers complete. Key behavior: When wait_for_triggers=True, triggers are created (to start the work like invoking processes, creating tasks) but NOT persisted to storage. The runtime polls inline and resumes without involving the persistence layer. Changes: - Add `wait_for_triggers` and `trigger_poll_interval` options to UiPathExecuteOptions - Create shared TriggerPoller utility for reusable polling logic - Update UiPathResumableRuntime to loop and poll when wait_for_triggers=True - Add skip_storage parameter to _handle_suspension() to avoid persistence - Refactor UiPathDebugRuntime to use the shared TriggerPoller Co-Authored-By: Claude Opus 4.5 --- src/uipath/runtime/base.py | 10 ++ src/uipath/runtime/debug/runtime.py | 105 ++++++------ src/uipath/runtime/resumable/__init__.py | 2 + src/uipath/runtime/resumable/polling.py | 118 ++++++++++++++ src/uipath/runtime/resumable/runtime.py | 195 +++++++++++++++++++---- 5 files changed, 343 insertions(+), 87 deletions(-) create mode 100644 src/uipath/runtime/resumable/polling.py diff --git a/src/uipath/runtime/base.py b/src/uipath/runtime/base.py index 246e974..bf7b347 100644 --- a/src/uipath/runtime/base.py +++ b/src/uipath/runtime/base.py @@ -41,6 +41,16 @@ class UiPathExecuteOptions(BaseModel): default=None, description="List of nodes or '*' to break on all steps.", ) + wait_for_triggers: bool = Field( + default=False, + description="When True, poll triggers until completion instead of suspending. " + "This keeps the process running and automatically resumes when triggers complete.", + ) + trigger_poll_interval: float = Field( + default=5.0, + description="Seconds between poll attempts when wait_for_triggers is True.", + ge=0.1, + ) model_config = {"arbitrary_types_allowed": True, "extra": "allow"} diff --git a/src/uipath/runtime/debug/runtime.py b/src/uipath/runtime/debug/runtime.py index b671645..5e64c66 100644 --- a/src/uipath/runtime/debug/runtime.py +++ b/src/uipath/runtime/debug/runtime.py @@ -4,8 +4,6 @@ import logging from typing import Any, AsyncGenerator, cast -from uipath.core.errors import UiPathPendingTriggerError - from uipath.runtime.base import ( UiPathExecuteOptions, UiPathRuntimeProtocol, @@ -25,7 +23,7 @@ UiPathRuntimeResult, UiPathRuntimeStatus, ) -from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol +from uipath.runtime.resumable.polling import TriggerPoller from uipath.runtime.resumable.runtime import UiPathResumableRuntime from uipath.runtime.resumable.trigger import ( UiPathResumeTrigger, @@ -203,8 +201,7 @@ async def _stream_and_debug( ) else: trigger_data = await self._poll_trigger( - final_result.trigger, - self.delegate.trigger_manager, + final_result.trigger ) resume_data = {interrupt_id: trigger_data} except UiPathDebugQuitError: @@ -245,77 +242,65 @@ async def dispose(self) -> None: logger.warning(f"Error disconnecting debug bridge: {e}") async def _poll_trigger( - self, trigger: UiPathResumeTrigger, reader: UiPathResumeTriggerReaderProtocol + self, trigger: UiPathResumeTrigger ) -> dict[str, Any] | None: """Poll a resume trigger until data is available. Args: trigger: The trigger to poll - reader: The trigger reader to use for polling Returns: - Resume data when available, or None if polling exhausted + Resume data when available, or None if polling was stopped Raises: UiPathDebugQuitError: If quit is requested during polling """ - attempt = 0 - while True: - attempt += 1 - - try: - resume_data = await reader.read_trigger(trigger) - - if resume_data is not None: - return resume_data - - await self.debug_bridge.emit_state_update( - UiPathRuntimeStateEvent( - node_name="", - payload={ - "attempt": attempt, - }, - ) + self._quit_requested = False + + async def on_poll_attempt(attempt: int, info: str | None) -> None: + """Callback for each poll attempt.""" + payload: dict[str, Any] = {"attempt": attempt} + if info: + payload["info"] = info + await self.debug_bridge.emit_state_update( + UiPathRuntimeStateEvent( + node_name="", + payload=payload, ) + ) - await self._wait_with_quit_check() - - except UiPathDebugQuitError: - raise - - except UiPathPendingTriggerError as e: - await self.debug_bridge.emit_state_update( - UiPathRuntimeStateEvent( - node_name="", - payload={ - "attempt": attempt, - "info": str(e), - }, - ) + async def should_stop() -> bool: + """Check if quit was requested.""" + # Check for termination request with a short timeout + try: + term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate()) + done, _ = await asyncio.wait( + {term_task}, + timeout=0.01, # Very short timeout just to check ) + if term_task in done: + self._quit_requested = True + return True + else: + term_task.cancel() + try: + await term_task + except asyncio.CancelledError: + pass + except Exception: + pass + return False - await self._wait_with_quit_check() - - async def _wait_with_quit_check(self) -> None: - """Wait for specified seconds, but allow quit command to interrupt. - - Raises: - UiPathDebugQuitError: If quit is requested during wait - """ - sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval)) - term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate()) - - done, pending = await asyncio.wait( - {sleep_task, term_task}, - return_when=asyncio.FIRST_COMPLETED, + poller = TriggerPoller( + reader=self.delegate.trigger_manager, + poll_interval=self.trigger_poll_interval, + on_poll_attempt=on_poll_attempt, + should_stop=should_stop, ) - for task in pending: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass + result = await poller.poll_trigger(trigger) - if term_task in done: + if self._quit_requested: raise UiPathDebugQuitError("Debugging terminated during polling.") + + return result diff --git a/src/uipath/runtime/resumable/__init__.py b/src/uipath/runtime/resumable/__init__.py index ee7f4e1..7713407 100644 --- a/src/uipath/runtime/resumable/__init__.py +++ b/src/uipath/runtime/resumable/__init__.py @@ -1,5 +1,6 @@ """Module for resumable runtime features.""" +from uipath.runtime.resumable.polling import TriggerPoller from uipath.runtime.resumable.protocols import ( UiPathResumableStorageProtocol, UiPathResumeTriggerCreatorProtocol, @@ -20,4 +21,5 @@ "UiPathResumeTrigger", "UiPathResumeTriggerType", "UiPathApiTrigger", + "TriggerPoller", ] diff --git a/src/uipath/runtime/resumable/polling.py b/src/uipath/runtime/resumable/polling.py new file mode 100644 index 0000000..e86581f --- /dev/null +++ b/src/uipath/runtime/resumable/polling.py @@ -0,0 +1,118 @@ +"""Trigger polling utilities for resumable runtime.""" + +import asyncio +import logging +from typing import Any, Callable, Coroutine + +from uipath.core.errors import UiPathPendingTriggerError + +from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol +from uipath.runtime.resumable.trigger import UiPathResumeTrigger + +logger = logging.getLogger(__name__) + + +class TriggerPoller: + """Utility for polling resume triggers until completion. + + This class provides reusable polling logic for waiting on triggers + to complete, used by both debug runtime and resumable runtime when + wait_for_triggers is enabled. + """ + + def __init__( + self, + reader: UiPathResumeTriggerReaderProtocol, + poll_interval: float = 5.0, + on_poll_attempt: Callable[[int, str | None], Coroutine[Any, Any, None]] + | None = None, + should_stop: Callable[[], Coroutine[Any, Any, bool]] | None = None, + ): + """Initialize the trigger poller. + + Args: + reader: The trigger reader to use for polling + poll_interval: Seconds between poll attempts + on_poll_attempt: Optional callback for each poll attempt (attempt_num, info) + should_stop: Optional async callback to check if polling should stop early + """ + self.reader = reader + self.poll_interval = poll_interval + self.on_poll_attempt = on_poll_attempt + self.should_stop = should_stop + + async def poll_trigger(self, trigger: UiPathResumeTrigger) -> Any | None: + """Poll a single trigger until data is available. + + Args: + trigger: The trigger to poll + + Returns: + Resume data when available, or None if polling was stopped + + Raises: + Exception: If trigger reading fails with non-pending error + """ + attempt = 0 + while True: + attempt += 1 + + # Check if we should stop + if self.should_stop and await self.should_stop(): + logger.debug("Polling stopped by should_stop callback") + return None + + try: + resume_data = await self.reader.read_trigger(trigger) + + if resume_data is not None: + logger.debug( + f"Trigger {trigger.interrupt_id} completed after {attempt} attempts" + ) + return resume_data + + # Notify about poll attempt + if self.on_poll_attempt: + await self.on_poll_attempt(attempt, None) + + await asyncio.sleep(self.poll_interval) + + except UiPathPendingTriggerError as e: + # Trigger still pending, notify and continue polling + if self.on_poll_attempt: + await self.on_poll_attempt(attempt, str(e)) + + await asyncio.sleep(self.poll_interval) + + async def poll_all_triggers( + self, triggers: list[UiPathResumeTrigger] + ) -> dict[str, Any]: + """Poll all triggers until they complete. + + Args: + triggers: List of triggers to poll + + Returns: + Dict mapping interrupt_id to resume data for completed triggers + """ + resume_map: dict[str, Any] = {} + + # Poll triggers concurrently + async def poll_single(trigger: UiPathResumeTrigger) -> tuple[str | None, Any]: + data = await self.poll_trigger(trigger) + return trigger.interrupt_id, data + + results = await asyncio.gather( + *[poll_single(trigger) for trigger in triggers], + return_exceptions=True, + ) + + for result in results: + if isinstance(result, Exception): + logger.error(f"Trigger polling failed: {result}") + raise result + interrupt_id, data = result + if interrupt_id and data is not None: + resume_map[interrupt_id] = data + + return resume_map diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index ccac3b6..25af816 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -11,8 +11,9 @@ UiPathStreamOptions, ) from uipath.runtime.debug.breakpoint import UiPathBreakpointResult -from uipath.runtime.events import UiPathRuntimeEvent +from uipath.runtime.events import UiPathRuntimeEvent, UiPathRuntimeStateEvent from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus +from uipath.runtime.resumable.polling import TriggerPoller from uipath.runtime.resumable.protocols import ( UiPathResumableStorageProtocol, UiPathResumeTriggerProtocol, @@ -66,14 +67,63 @@ async def execute( Returns: Execution result, potentially with resume trigger attached """ - # If resuming, restore trigger from storage - if options and options.resume: - input = await self._restore_resume_input(input) + options = options or UiPathExecuteOptions() + current_input = input + current_options = UiPathExecuteOptions( + resume=options.resume, + breakpoints=options.breakpoints, + wait_for_triggers=options.wait_for_triggers, + trigger_poll_interval=options.trigger_poll_interval, + ) + + while True: + # If resuming, restore trigger from storage + if current_options.resume: + current_input = await self._restore_resume_input(current_input) - # Execute the delegate - result = await self.delegate.execute(input, options=options) - # If suspended, create and persist trigger - return await self._handle_suspension(result) + # Execute the delegate + result = await self.delegate.execute(current_input, options=current_options) + + # If suspended, create trigger (and persist unless wait_for_triggers) + # When wait_for_triggers=True, we skip storage to avoid persistence + result = await self._handle_suspension( + result, skip_storage=options.wait_for_triggers + ) + + # If not suspended or wait_for_triggers is False, return the result + if ( + result.status != UiPathRuntimeStatus.SUSPENDED + or not options.wait_for_triggers + ): + return result + + # Skip breakpoint results - they should be handled by debug runtime + if isinstance(result, UiPathBreakpointResult): + return result + + # Poll triggers until completion (no storage involved) + if result.triggers: + logger.info( + f"Waiting for {len(result.triggers)} trigger(s) to complete..." + ) + poller = TriggerPoller( + reader=self.trigger_manager, + poll_interval=options.trigger_poll_interval, + ) + resume_data = await poller.poll_all_triggers(result.triggers) + + if resume_data: + # Continue execution with resume data + # No need to delete from storage since we didn't persist + current_input = resume_data + current_options.resume = True + logger.info("Triggers completed, resuming execution...") + else: + # No data returned, return suspended result + return result + else: + # No triggers to poll, return suspended result + return result async def stream( self, @@ -89,20 +139,99 @@ async def stream( Yields: Runtime events during execution, final event is UiPathRuntimeResult """ - # If resuming, restore trigger from storage - if options and options.resume: - input = await self._restore_resume_input(input) - - final_result: UiPathRuntimeResult | None = None - async for event in self.delegate.stream(input, options=options): - if isinstance(event, UiPathRuntimeResult): - final_result = event - else: - yield event + options = options or UiPathStreamOptions() + current_input = input + current_options = UiPathStreamOptions( + resume=options.resume, + breakpoints=options.breakpoints, + wait_for_triggers=options.wait_for_triggers, + trigger_poll_interval=options.trigger_poll_interval, + ) + execution_completed = False + + while not execution_completed: + # If resuming, restore trigger from storage + if current_options.resume: + current_input = await self._restore_resume_input(current_input) + + final_result: UiPathRuntimeResult | None = None + async for event in self.delegate.stream( + current_input, options=current_options + ): + if isinstance(event, UiPathRuntimeResult): + final_result = event + else: + yield event + + if not final_result: + execution_completed = True + continue - # If suspended, create and persist trigger - if final_result: - yield await self._handle_suspension(final_result) + # If suspended, create trigger (and persist unless wait_for_triggers) + # When wait_for_triggers=True, we skip storage to avoid persistence + final_result = await self._handle_suspension( + final_result, skip_storage=options.wait_for_triggers + ) + + # If not suspended or wait_for_triggers is False, yield result and exit + if ( + final_result.status != UiPathRuntimeStatus.SUSPENDED + or not options.wait_for_triggers + ): + yield final_result + execution_completed = True + continue + + # Skip breakpoint results - they should be handled by debug runtime + if isinstance(final_result, UiPathBreakpointResult): + yield final_result + execution_completed = True + continue + + # Poll triggers until completion (no storage involved) + if final_result.triggers: + logger.info( + f"Waiting for {len(final_result.triggers)} trigger(s) to complete..." + ) + + # Emit a state event to indicate we're polling + yield UiPathRuntimeStateEvent( + node_name="", + payload={ + "trigger_count": len(final_result.triggers), + "trigger_types": [ + t.trigger_type.value if t.trigger_type else None + for t in final_result.triggers + ], + }, + ) + + poller = TriggerPoller( + reader=self.trigger_manager, + poll_interval=options.trigger_poll_interval, + ) + resume_data = await poller.poll_all_triggers(final_result.triggers) + + if resume_data: + # Emit state event for resumption + # No need to delete from storage since we didn't persist + yield UiPathRuntimeStateEvent( + node_name="", + payload={"resumed_triggers": list(resume_data.keys())}, + ) + + # Continue execution with resume data + current_input = resume_data + current_options.resume = True + logger.info("Triggers completed, resuming execution...") + else: + # No data returned, yield suspended result and exit + yield final_result + execution_completed = True + else: + # No triggers to poll, yield suspended result and exit + yield final_result + execution_completed = True async def _restore_resume_input( self, input: dict[str, Any] | None @@ -159,12 +288,16 @@ async def _restore_resume_input( return resume_map async def _handle_suspension( - self, result: UiPathRuntimeResult + self, + result: UiPathRuntimeResult, + skip_storage: bool = False, ) -> UiPathRuntimeResult: """Create and persist resume trigger if execution was suspended. Args: result: The execution result to check for suspension + skip_storage: If True, skip saving triggers to storage (used when + wait_for_triggers is enabled to avoid persistence) """ # Only handle suspensions if result.status != UiPathRuntimeStatus.SUSPENDED: @@ -183,16 +316,20 @@ async def _handle_suspension( ) # Get existing triggers and current interrupts - suspended_result.triggers = ( - await self.storage.get_triggers(self.runtime_id) or [] - ) + if not skip_storage: + suspended_result.triggers = ( + await self.storage.get_triggers(self.runtime_id) or [] + ) + else: + suspended_result.triggers = [] + current_interrupts = result.output or {} # Diff: find new interrupts existing_ids = [t.interrupt_id for t in suspended_result.triggers] new_ids = [key for key in current_interrupts.keys() if key not in existing_ids] - # Create triggers only for new interrupts + # Create triggers only for new interrupts (this starts the actual work) for interrupt_id in new_ids: trigger = await self.trigger_manager.create_trigger( current_interrupts[interrupt_id] @@ -200,10 +337,14 @@ async def _handle_suspension( trigger.interrupt_id = interrupt_id suspended_result.triggers.append(trigger) - if suspended_result.triggers: + # Only persist to storage if not skipping (normal suspend flow) + if suspended_result.triggers and not skip_storage: await self.storage.save_triggers(self.runtime_id, suspended_result.triggers) # Backward compatibility: set single trigger directly suspended_result.trigger = suspended_result.triggers[0] + elif suspended_result.triggers: + # Still set single trigger for backward compat, just don't persist + suspended_result.trigger = suspended_result.triggers[0] return suspended_result