Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.6.2"
version = "0.6.3"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
4 changes: 4 additions & 0 deletions src/uipath/runtime/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class UiPathRuntimeResult(UiPathRuntimeEvent):
default=UiPathRuntimeEventType.RUNTIME_RESULT, frozen=True
)

def is_suspended(self) -> bool:
"""Checks if runtime execution is suspended."""
return self.status == UiPathRuntimeStatus.SUSPENDED

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary format for output."""
output_data: dict[str, Any] | str
Expand Down
76 changes: 58 additions & 18 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
)
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.result import (
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable.protocols import (
UiPathResumableStorageProtocol,
UiPathResumeTriggerProtocol,
)
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
from uipath.runtime.schema import UiPathRuntimeSchema

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,12 +72,28 @@ async def execute(
"""
# If resuming, restore trigger from storage
if options and options.resume:
# restore trigger from storage
input = await self._restore_resume_input(input)

# Execute the delegate
result = await self.delegate.execute(input, options=options)
# If suspended, create and persist trigger
return await self._handle_suspension(result)
while True:
# Execute the delegate
result = await self.delegate.execute(input, options=options)
# If suspended, create and persist trigger
suspension_result = await self._handle_suspension(result)

# check if any trigger may be resumed
if not suspension_result.is_suspended() or not (
fired_triggers := await self._restore_resume_input(None)
):
return suspension_result

# Note: when resuming a job, orchestrator deletes all triggers associated with it,
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
input = fired_triggers
if not options:
options = UiPathExecuteOptions(resume=True)
else:
options.resume = True

async def stream(
self,
Expand All @@ -94,15 +114,31 @@ async def stream(
input = await self._restore_resume_input(input)

final_result: UiPathRuntimeResult | None = None
async for event in self.delegate.stream(input, options=options):
if isinstance(event, UiPathRuntimeResult):
final_result = event
else:
yield event
while True:
async for event in self.delegate.stream(input, options=options):
if isinstance(event, UiPathRuntimeResult):
final_result = event
else:
yield event

# If suspended, create and persist trigger
if final_result:
suspension_result = await self._handle_suspension(final_result)

if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
fired_triggers := await self._restore_resume_input(None)
):
yield suspension_result
return

# If suspended, create and persist trigger
if final_result:
yield await self._handle_suspension(final_result)
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
input = fired_triggers

if not options:
options = UiPathStreamOptions(resume=True)
else:
options.resume = True

async def _restore_resume_input(
self, input: dict[str, Any] | None
Expand Down Expand Up @@ -142,6 +178,11 @@ async def _restore_resume_input(
if not triggers:
return None

return await self._build_resume_map(triggers)

async def _build_resume_map(
self, triggers: list[UiPathResumeTrigger]
) -> dict[str, Any]:
# Build resume map: {interrupt_id: resume_data}
resume_map: dict[str, Any] = {}
for trigger in triggers:
Expand All @@ -166,11 +207,10 @@ async def _handle_suspension(
Args:
result: The execution result to check for suspension
"""
# Only handle suspensions
if result.status != UiPathRuntimeStatus.SUSPENDED:
return result

if isinstance(result, UiPathBreakpointResult):
# Only handle interrupt suspensions
if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance(
result, UiPathBreakpointResult
):
return result

suspended_result = UiPathRuntimeResult(
Expand Down
Loading