Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions examples/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,20 @@ async def create_shipment(
items: list[str],
address: str,
) -> tuple[str, str]:
"""Create a shipment for the order."""
"""
Create a shipment for an order and record its tracking number.

Parameters:
order_id (str): Identifier of the order to ship.
items (list[str]): List of item identifiers included in the shipment.
address (str): Shipping address; must not be empty.

Returns:
tuple[str, str]: A tuple containing the created `shipment_id` and its `tracking_number`.

Raises:
ValueError: If `address` is empty.
"""
if not address:
raise ValueError("Shipping address is required")

Expand Down Expand Up @@ -469,7 +482,11 @@ class OrderSaga(Saga[OrderContext]):


async def run_successful_saga() -> None:
"""Demonstrate a successful saga execution."""
"""
Run an example order-processing saga and print the per-step progress and final results.

Sets up mock services, dependency injection, and in-memory saga storage; executes the OrderSaga with a generated saga ID, prints each completed step, then prints the final saga status, context fields (inventory reservation, payment ID, shipment ID) and the persisted execution log. If saga execution fails, the failure is printed and the exception is re-raised.
"""
print("\n" + "=" * 70)
print("SCENARIO 1: Successful Order Processing Saga")
print("=" * 70)
Expand Down Expand Up @@ -736,4 +753,4 @@ async def main() -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
20 changes: 17 additions & 3 deletions examples/saga_fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,17 @@ async def act(
self,
context: OrderContext,
) -> SagaStepResult[OrderContext, ReserveInventoryResponse]:
"""Primary step that always raises an error."""
"""
Simulate a failing primary reservation step for the saga.

This action always raises a RuntimeError to emulate an unavailable downstream service and trigger fallback or compensation behavior.

Parameters:
context (OrderContext): Shared saga context containing order details (e.g., order_id, user_id, amount, reservation_id).

Raises:
RuntimeError: Indicates the primary step failed (service unavailable).
"""
self._call_count += 1
logger.info(
f" [PrimaryStep] Executing act() for order {context.order_id} " f"(call #{self._call_count})...",
Expand Down Expand Up @@ -271,7 +281,11 @@ async def run_saga(


async def main() -> None:
"""Run saga fallback example."""
"""
Run an interactive demonstration of the saga fallback pattern with a circuit breaker.

Executes three scenarios that show a failing primary step with an automatic fallback, the circuit breaker opening after a configurable number of failures, and fail-fast behavior when the circuit is open. Also conditionally demonstrates configuring a Redis-backed circuit breaker storage, prints per-scenario results and a summary, and informs about missing optional dependencies.
"""
print("\n" + "=" * 80)
print("SAGA FALLBACK PATTERN WITH CIRCUIT BREAKER EXAMPLE")
print("=" * 80)
Expand Down Expand Up @@ -419,4 +433,4 @@ class OrderSagaWithRedisBreaker(Saga[OrderContext]):


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
66 changes: 40 additions & 26 deletions examples/saga_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,20 @@ async def create_shipment(
items: list[str],
address: str,
) -> tuple[str, str]:
"""Create a shipment for the order."""
"""
Create a shipment record for the given order and generate a tracking number.

Parameters:
order_id (str): Identifier of the order.
items (list[str]): Items included in the shipment.
address (str): Destination shipping address.

Returns:
tuple[str, str]: A tuple containing the shipment ID and the tracking number.

Raises:
ValueError: If `address` is empty.
"""
if not address:
raise ValueError("Shipping address is required")

Expand Down Expand Up @@ -519,16 +532,12 @@ async def resolve(self, type_: type) -> typing.Any:

async def simulate_interrupted_saga() -> tuple[uuid.UUID, MemorySagaStorage]:
"""
Simulate a saga that gets interrupted after the first step.

This simulates what happens when:
- Server crashes after completing ReserveInventoryStep
- Network timeout occurs
- Process is killed during execution
- Database connection is lost

Simulate a saga that is interrupted after the inventory reservation step to produce a recoverable persisted state.

Returns:
Tuple of (saga_id, storage) so we can recover it later.
tuple:
saga_id (uuid.UUID): Identifier of the created saga.
storage (MemorySagaStorage): In-memory storage containing the persisted saga state and step history for recovery.
"""
print("\n" + "=" * 70)
print("SCENARIO 1: Simulating Interrupted Saga")
Expand Down Expand Up @@ -622,13 +631,13 @@ async def recover_interrupted_saga(
storage: MemorySagaStorage,
) -> None:
"""
Recover and complete the interrupted saga.

This demonstrates how recovery ensures eventual consistency by:
1. Loading saga state from storage
2. Reconstructing context
3. Resuming execution from last completed step
4. Completing remaining steps
Recover and complete an interrupted saga using persisted state.
Loads the saga state from storage, reconstructs the saga context, resumes execution from the last completed step, and completes any remaining steps to restore eventual consistency.

Parameters:
saga_id (uuid.UUID): Identifier of the saga instance to recover.
storage (MemorySagaStorage): Durable storage containing the saga's persisted state and step history.
"""
print("\n" + "=" * 70)
print("SCENARIO 2: Recovering Interrupted Saga")
Expand Down Expand Up @@ -688,10 +697,12 @@ async def recover_interrupted_saga(

async def simulate_interrupted_compensation() -> tuple[uuid.UUID, MemorySagaStorage]:
"""
Simulate a saga that fails and gets interrupted during compensation.

This shows recovery of compensation logic, which is critical for
maintaining consistency when rollback is interrupted.
Simulate a saga that fails and is interrupted during compensation.

Sets up services, a saga, and a failing shipment step to trigger compensation that is then artificially interrupted; returns identifiers and storage state for performing recovery in a separate run.

Returns:
tuple[uuid.UUID, MemorySagaStorage]: The saga ID and the in-memory storage containing the persisted saga state and step history after the simulated interruption.
"""
print("\n" + "=" * 70)
print("SCENARIO 3: Simulating Interrupted Compensation")
Expand Down Expand Up @@ -801,10 +812,13 @@ async def recover_interrupted_compensation(
storage: MemorySagaStorage,
) -> None:
"""
Recover and complete the interrupted compensation.

This ensures that even if compensation is interrupted, it will
eventually complete, releasing all resources.
Recover and complete an interrupted compensation for a saga.

Loads the saga state from the provided storage using the given saga identifier and drives any incomplete compensation steps to completion, ensuring resources (inventory, payments, shipments) are released and the system reaches a consistent state. Progress and final status are printed to stdout.

Parameters:
saga_id (uuid.UUID): Identifier of the saga to recover.
storage (MemorySagaStorage): Persistent storage containing the saga state and step history.
"""
print("\n" + "=" * 70)
print("SCENARIO 4: Recovering Interrupted Compensation")
Expand Down Expand Up @@ -910,4 +924,4 @@ async def main() -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
8 changes: 6 additions & 2 deletions examples/saga_recovery_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,11 @@ async def create_interrupted_saga(storage: MemorySagaStorage) -> uuid.UUID:


async def main() -> None:
"""Run the recovery scheduler example."""
"""
Run the saga recovery scheduler demo and display its outcome.

Sets up an in-memory saga storage, creates a simulated interrupted saga and marks it stale, runs the recovery loop for three iterations (using the module's recovery_loop and recovery configuration constants), then loads and prints the final saga state.
"""
print("\n" + "=" * 70)
print("SAGA RECOVERY SCHEDULER EXAMPLE")
print("=" * 70)
Expand Down Expand Up @@ -660,4 +664,4 @@ async def main() -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
7 changes: 6 additions & 1 deletion examples/saga_sqlalchemy_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ async def setup_database(engine: AsyncEngine) -> None:
async def main() -> None:
# 1. Create SQLAlchemy Engine with Connection Pool
# SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite)
"""
Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage.

Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine.
"""
engine = create_async_engine(
DB_URL,
echo=False, # Set to True to see SQL queries
Expand Down Expand Up @@ -202,4 +207,4 @@ def saga_mapper(mapper: cqrs.SagaMap) -> None:


if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
36 changes: 20 additions & 16 deletions src/cqrs/saga/compensation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ def __init__(
on_after_compensate_step: typing.Callable[[], typing.Awaitable[None]] | None = None,
) -> None:
"""
Initialize compensator.

Args:
saga_id: UUID of the saga
context: Saga context
storage: Saga storage implementation (or run object with same interface)
retry_count: Number of retry attempts for compensation
retry_delay: Initial delay between retries in seconds
retry_backoff: Backoff multiplier for exponential delay
on_after_compensate_step: Optional async callback after each successfully
compensated step (e.g. run.commit() for checkpoint).
Create a SagaCompensator configured to perform compensation of completed saga steps with retry and optional post-step callback.

Parameters:
saga_id: Identifier of the saga.
context: Saga execution context passed to step compensation handlers.
storage: Storage or run object implementing saga persistence operations.
retry_count: Maximum number of attempts per step before giving up.
retry_delay: Initial delay in seconds before the first retry.
retry_backoff: Multiplier applied to the delay for each successive retry (exponential backoff).
on_after_compensate_step: Optional async callback invoked after each step is successfully compensated.
"""
self._saga_id = saga_id
self._context = context
Expand All @@ -51,10 +50,15 @@ async def compensate_steps(
completed_steps: list[SagaStepHandler[ContextT, typing.Any]],
) -> None:
"""
Compensate all completed steps in reverse order with retry mechanism.

Args:
completed_steps: List of completed step handlers to compensate
Compensates completed saga steps in reverse order, applying retry logic and recording step statuses.

Compensates each handler from last to first, skipping steps already recorded as compensated in the saga history. Updates the saga status to COMPENSATING at the start and logs per-step statuses (STARTED, COMPLETED, FAILED) in storage. After a step completes, the optional on_after_compensate_step callback (if provided) is awaited. If any step fails after all retry attempts, the saga is marked as FAILED. If no completed steps are provided, no compensation is attempted and the saga is marked as FAILED.

Parameters:
completed_steps (list[SagaStepHandler[ContextT, typing.Any]]): Handlers corresponding to steps that completed during the saga; these will be compensated in reverse order.

Returns:
None
"""
await self._storage.update_status(self._saga_id, SagaStatus.COMPENSATING)

Expand Down Expand Up @@ -170,4 +174,4 @@ async def _compensate_step_with_retry(

# If we get here, all retries failed
if last_exception:
raise last_exception
raise last_exception
44 changes: 22 additions & 22 deletions src/cqrs/saga/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def __init__(
storage: ISagaStorage | SagaStorageRun,
) -> None:
"""
Initialize state manager.

Args:
saga_id: UUID of the saga
storage: Saga storage implementation
Create a SagaStateManager bound to a specific saga identifier and storage backend.
Parameters:
saga_id: Identifier for the saga instance.
storage: Storage backend implementing ISagaStorage or SagaStorageRun used to persist saga state and history.
"""
self._saga_id = saga_id
self._storage = storage
Expand Down Expand Up @@ -84,13 +84,13 @@ def __init__(
saga_steps: list[type[SagaStepHandler] | Fallback],
) -> None:
"""
Initialize recovery manager.

Args:
saga_id: UUID of the saga
storage: Saga storage implementation
container: DI container for resolving step handlers
saga_steps: List of saga steps
Construct a SagaRecoveryManager that holds the identifiers, storage, DI container, and configured saga steps required to reconstruct a saga's execution state.
Parameters:
saga_id: Identifier for the saga instance (e.g., UUID or other unique value).
storage: Persistence backend implementing saga history operations (ISagaStorage or SagaStorageRun).
container: Dependency injection container used to resolve step handler instances.
saga_steps: Ordered list of saga step types or Fallback wrappers that define the saga's execution sequence.
"""
self._saga_id = saga_id
self._storage = storage
Expand All @@ -99,10 +99,10 @@ def __init__(

async def load_completed_step_names(self) -> set[str]:
"""
Load set of completed step names from history.

Return the names of saga steps that completed their primary ("act") action.
Returns:
Set of step names that have been completed
set[str]: Step names recorded with status `SagaStepStatus.COMPLETED` and action `"act"`.
"""
history = await self._storage.get_step_history(self._saga_id)
return {e.step_name for e in history if e.status == SagaStepStatus.COMPLETED and e.action == "act"}
Expand All @@ -112,13 +112,13 @@ async def reconstruct_completed_steps(
completed_step_names: set[str],
) -> list[SagaStepHandler[SagaContext, typing.Any]]:
"""
Reconstruct list of completed step handlers from history.

Args:
completed_step_names: Set of completed step names

Reconstructs and returns the resolved step handler instances corresponding to the completed steps, preserving saga execution order.
Parameters:
completed_step_names (set[str]): Names of steps that completed the "act" action.
Returns:
List of step handlers in execution order
list[SagaStepHandler[SagaContext, typing.Any]]: Resolved step handler instances in execution order. For Fallback wrappers, the primary handler is chosen if its name appears in completed_step_names; otherwise the fallback handler is chosen when present.
"""
completed_steps: list[SagaStepHandler[SagaContext, typing.Any]] = []

Expand Down Expand Up @@ -365,4 +365,4 @@ async def execute_fallback_step(
raise fallback_error
else:
# Should not fallback, re-raise original error
raise primary_error
raise primary_error
Loading
Loading