Skip to content

Commit 740e611

Browse files
committed
feat: resume runtime on fired triggers
1 parent 9e7f9cc commit 740e611

3 files changed

Lines changed: 265 additions & 15 deletions

File tree

src/uipath/runtime/result.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,10 @@ def to_dict(self) -> dict[str, Any]:
6161
result["error"] = self.error.model_dump()
6262

6363
return result
64+
65+
66+
class UiPathSuspensionResult(BaseModel):
67+
"""Result of a runtime suspension."""
68+
69+
runtime_result: UiPathRuntimeResult
70+
fired_triggers_map: dict[str, Any] | None = Field(default=None)

src/uipath/runtime/resumable/runtime.py

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,16 @@
1212
)
1313
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
1414
from uipath.runtime.events import UiPathRuntimeEvent
15-
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
15+
from uipath.runtime.result import (
16+
UiPathRuntimeResult,
17+
UiPathRuntimeStatus,
18+
UiPathSuspensionResult,
19+
)
1620
from uipath.runtime.resumable.protocols import (
1721
UiPathResumableStorageProtocol,
1822
UiPathResumeTriggerProtocol,
1923
)
24+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2025
from uipath.runtime.schema import UiPathRuntimeSchema
2126

2227
logger = logging.getLogger(__name__)
@@ -56,6 +61,7 @@ async def execute(
5661
self,
5762
input: dict[str, Any] | None = None,
5863
options: UiPathExecuteOptions | None = None,
64+
fired_triggers_map: dict[str, Any] | None = None,
5965
) -> UiPathRuntimeResult:
6066
"""Execute with resume trigger handling.
6167
@@ -66,19 +72,35 @@ async def execute(
6672
Returns:
6773
Execution result, potentially with resume trigger attached
6874
"""
69-
# If resuming, restore trigger from storage
75+
# check if we are resuming
7076
if options and options.resume:
71-
input = await self._restore_resume_input(input)
77+
if fired_triggers_map:
78+
input = fired_triggers_map
79+
else:
80+
# restore trigger from storage
81+
input = await self._restore_resume_input(input)
7282

7383
# Execute the delegate
7484
result = await self.delegate.execute(input, options=options)
7585
# If suspended, create and persist trigger
76-
return await self._handle_suspension(result)
86+
suspension_result = await self._handle_suspension(result)
87+
if not suspension_result.fired_triggers_map:
88+
return suspension_result.runtime_result
89+
90+
# some triggers are already fired, runtime can be resumed
91+
resume_options = options or UiPathExecuteOptions(resume=True)
92+
if not resume_options.resume:
93+
resume_options = UiPathExecuteOptions(resume=True)
94+
return await self.execute(
95+
fired_triggers_map=suspension_result.fired_triggers_map,
96+
options=resume_options,
97+
)
7798

7899
async def stream(
79100
self,
80101
input: dict[str, Any] | None = None,
81102
options: UiPathStreamOptions | None = None,
103+
fired_triggers_map: dict[str, Any] | None = None,
82104
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
83105
"""Stream with resume trigger handling.
84106
@@ -89,9 +111,13 @@ async def stream(
89111
Yields:
90112
Runtime events during execution, final event is UiPathRuntimeResult
91113
"""
92-
# If resuming, restore trigger from storage
114+
# check if we are resuming
93115
if options and options.resume:
94-
input = await self._restore_resume_input(input)
116+
if fired_triggers_map:
117+
input = fired_triggers_map
118+
else:
119+
# restore trigger from storage
120+
input = await self._restore_resume_input(input)
95121

96122
final_result: UiPathRuntimeResult | None = None
97123
async for event in self.delegate.stream(input, options=options):
@@ -102,7 +128,21 @@ async def stream(
102128

103129
# If suspended, create and persist trigger
104130
if final_result:
105-
yield await self._handle_suspension(final_result)
131+
suspension_result = await self._handle_suspension(final_result)
132+
133+
if not suspension_result.fired_triggers_map:
134+
yield suspension_result.runtime_result
135+
return
136+
137+
# some triggers are already fired, runtime can be resumed
138+
resume_options = options or UiPathStreamOptions(resume=True)
139+
if not resume_options.resume:
140+
resume_options = UiPathStreamOptions(resume=True)
141+
async for event in self.stream(
142+
fired_triggers_map=suspension_result.fired_triggers_map,
143+
options=resume_options,
144+
):
145+
yield event
106146

107147
async def _restore_resume_input(
108148
self, input: dict[str, Any] | None
@@ -142,6 +182,11 @@ async def _restore_resume_input(
142182
if not triggers:
143183
return None
144184

185+
return await self._build_resume_map(triggers)
186+
187+
async def _build_resume_map(
188+
self, triggers: list[UiPathResumeTrigger]
189+
) -> dict[str, Any]:
145190
# Build resume map: {interrupt_id: resume_data}
146191
resume_map: dict[str, Any] = {}
147192
for trigger in triggers:
@@ -160,18 +205,17 @@ async def _restore_resume_input(
160205

161206
async def _handle_suspension(
162207
self, result: UiPathRuntimeResult
163-
) -> UiPathRuntimeResult:
208+
) -> UiPathSuspensionResult:
164209
"""Create and persist resume trigger if execution was suspended.
165210
166211
Args:
167212
result: The execution result to check for suspension
168213
"""
169-
# Only handle suspensions
170-
if result.status != UiPathRuntimeStatus.SUSPENDED:
171-
return result
172-
173-
if isinstance(result, UiPathBreakpointResult):
174-
return result
214+
# Only handle interrupt suspensions
215+
if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance(
216+
result, UiPathBreakpointResult
217+
):
218+
return UiPathSuspensionResult(runtime_result=result)
175219

176220
suspended_result = UiPathRuntimeResult(
177221
status=UiPathRuntimeStatus.SUSPENDED,
@@ -205,7 +249,17 @@ async def _handle_suspension(
205249
# Backward compatibility: set single trigger directly
206250
suspended_result.trigger = suspended_result.triggers[0]
207251

208-
return suspended_result
252+
# check if any trigger can be resumed
253+
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
254+
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
255+
triggers = await self.storage.get_triggers(self.runtime_id)
256+
257+
return UiPathSuspensionResult(
258+
runtime_result=suspended_result,
259+
fired_triggers_map=await self._build_resume_map(triggers)
260+
if triggers
261+
else None,
262+
)
209263

210264
async def get_schema(self) -> UiPathRuntimeSchema:
211265
"""Passthrough schema from delegate runtime."""

tests/test_resumable.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,192 @@ async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]:
257257
assert result.output["completed"] is True
258258
assert "int-2" in result.output["resume_data"]
259259
assert "int-3" in result.output["resume_data"]
260+
261+
262+
@pytest.mark.asyncio
263+
async def test_resumable_auto_resumes_when_triggers_already_fired():
264+
"""When triggers are already fired during suspension, runtime should auto-resume."""
265+
266+
runtime_impl = MultiTriggerMockRuntime()
267+
storage = StatefulStorageMock()
268+
trigger_manager = make_trigger_manager_mock()
269+
270+
read_count = {"count": 0}
271+
272+
# configure trigger manager to return triggers as already fired only on first batch
273+
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
274+
read_count["count"] += 1
275+
# first two triggers (int-1, int-2) are immediately available
276+
# subsequent triggers are pending
277+
if trigger.interrupt_id in ["int-1", "int-2"] and read_count["count"] <= 2:
278+
return {"approved": True}
279+
raise UiPathPendingTriggerError("pending")
280+
281+
trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore
282+
283+
resumable = UiPathResumableRuntime(
284+
delegate=runtime_impl,
285+
storage=storage,
286+
trigger_manager=trigger_manager,
287+
runtime_id="runtime-1",
288+
)
289+
290+
# First execution - should suspend with int-1 and int-2, but since both are
291+
# already fired, it should auto-resume and suspend again with int-2 and int-3
292+
result = await resumable.execute({})
293+
294+
# The runtime should have auto-resumed once and suspended again
295+
assert result.status == UiPathRuntimeStatus.SUSPENDED
296+
assert result.triggers is not None
297+
assert len(result.triggers) == 2
298+
# After auto-resume, we should be at second suspension with int-2, int-3
299+
assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"}
300+
301+
# Delegate should have been executed twice (initial + auto-resume)
302+
assert runtime_impl.execution_count == 2
303+
304+
305+
@pytest.mark.asyncio
306+
async def test_resumable_auto_resumes_partial_fired_triggers():
307+
"""When only some triggers are fired during suspension, auto-resume with those."""
308+
309+
runtime_impl = MultiTriggerMockRuntime()
310+
storage = StatefulStorageMock()
311+
trigger_manager = make_trigger_manager_mock()
312+
313+
# Configure trigger manager so int-1 is fired but int-2 is pending
314+
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
315+
if trigger.interrupt_id == "int-1":
316+
return {"approved": True}
317+
raise UiPathPendingTriggerError("still pending")
318+
319+
trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore
320+
321+
resumable = UiPathResumableRuntime(
322+
delegate=runtime_impl,
323+
storage=storage,
324+
trigger_manager=trigger_manager,
325+
runtime_id="runtime-1",
326+
)
327+
328+
# First execution - int-1 fires immediately, int-2 stays pending
329+
# Should auto-resume with int-1 and suspend with int-2, int-3
330+
result = await resumable.execute({})
331+
332+
assert result.status == UiPathRuntimeStatus.SUSPENDED
333+
assert result.triggers is not None
334+
# After auto-resume with int-1, should have int-2 (still pending) + int-3 (new)
335+
assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"}
336+
337+
# Verify int-1 was consumed (deleted from storage)
338+
remaining_triggers = await storage.get_triggers("runtime-1")
339+
assert all(t.interrupt_id != "int-1" for t in remaining_triggers)
340+
341+
342+
@pytest.mark.asyncio
343+
async def test_resumable_auto_resumes_multiple_times():
344+
"""When triggers keep being fired immediately, keep auto-resuming until complete."""
345+
346+
runtime_impl = MultiTriggerMockRuntime()
347+
storage = StatefulStorageMock()
348+
trigger_manager = make_trigger_manager_mock()
349+
350+
# All triggers are always immediately available
351+
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
352+
return {"approved": True}
353+
354+
trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore
355+
356+
resumable = UiPathResumableRuntime(
357+
delegate=runtime_impl,
358+
storage=storage,
359+
trigger_manager=trigger_manager,
360+
runtime_id="runtime-1",
361+
)
362+
363+
# Execute once - should auto-resume through all suspensions
364+
result = await resumable.execute({})
365+
366+
# Should complete successfully after auto-resuming twice
367+
# 1st exec: suspend with int-1, int-2 -> auto-resume
368+
# 2nd exec: suspend with int-2, int-3 -> auto-resume
369+
# 3rd exec: complete
370+
assert result.status == UiPathRuntimeStatus.SUCCESSFUL
371+
assert isinstance(result.output, dict)
372+
assert result.output["completed"] is True
373+
374+
# Delegate should have been executed 3 times
375+
assert runtime_impl.execution_count == 3
376+
377+
378+
@pytest.mark.asyncio
379+
async def test_resumable_stream_auto_resumes_when_triggers_fired():
380+
"""Stream auto-resume when triggers are already fired."""
381+
382+
runtime_impl = MultiTriggerMockRuntime()
383+
storage = StatefulStorageMock()
384+
trigger_manager = make_trigger_manager_mock()
385+
386+
read_attempts = {"count": 0}
387+
388+
# Configure int-1 to be immediately fired, int-2 pending
389+
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
390+
read_attempts["count"] += 1
391+
if trigger.interrupt_id == "int-1" and read_attempts["count"] <= 1:
392+
return {"approved": True}
393+
raise UiPathPendingTriggerError("pending")
394+
395+
trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore
396+
397+
resumable = UiPathResumableRuntime(
398+
delegate=runtime_impl,
399+
storage=storage,
400+
trigger_manager=trigger_manager,
401+
runtime_id="runtime-1",
402+
)
403+
404+
# Stream should auto-resume and yield final result after auto-resume
405+
events = []
406+
async for event in resumable.stream({}):
407+
events.append(event)
408+
409+
# Should have received exactly one final result (after auto-resume)
410+
assert len(events) == 1
411+
assert isinstance(events[0], UiPathRuntimeResult)
412+
assert events[0].status == UiPathRuntimeStatus.SUSPENDED
413+
414+
# Should be at second suspension (after auto-resume with int-1)
415+
assert events[0].triggers is not None
416+
assert {t.interrupt_id for t in events[0].triggers} == {"int-2", "int-3"}
417+
418+
419+
@pytest.mark.asyncio
420+
async def test_resumable_no_auto_resume_when_all_triggers_pending():
421+
"""When all triggers are pending, should NOT auto-resume."""
422+
423+
runtime_impl = MultiTriggerMockRuntime()
424+
storage = StatefulStorageMock()
425+
trigger_manager = make_trigger_manager_mock()
426+
427+
# All triggers are pending
428+
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
429+
raise UiPathPendingTriggerError("pending")
430+
431+
trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore
432+
433+
resumable = UiPathResumableRuntime(
434+
delegate=runtime_impl,
435+
storage=storage,
436+
trigger_manager=trigger_manager,
437+
runtime_id="runtime-1",
438+
)
439+
440+
# Execute - should suspend
441+
result = await resumable.execute({})
442+
443+
assert result.status == UiPathRuntimeStatus.SUSPENDED
444+
assert result.triggers is not None
445+
assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"}
446+
447+
# Delegate should have been executed only once)
448+
assert runtime_impl.execution_count == 1

0 commit comments

Comments
 (0)