diff --git a/examples/saga.py b/examples/saga.py index b4956ce..a0710d6 100644 --- a/examples/saga.py +++ b/examples/saga.py @@ -275,7 +275,20 @@ async def create_shipment( items: list[str], address: str, ) -> tuple[str, str]: - """Create a shipment for the order.""" + """ + Create a shipment for an order and record its tracking number. + + Parameters: + order_id (str): Identifier of the order to ship. + items (list[str]): List of item identifiers included in the shipment. + address (str): Shipping address; must not be empty. + + Returns: + tuple[str, str]: A tuple containing the created `shipment_id` and its `tracking_number`. + + Raises: + ValueError: If `address` is empty. + """ if not address: raise ValueError("Shipping address is required") @@ -469,7 +482,11 @@ class OrderSaga(Saga[OrderContext]): async def run_successful_saga() -> None: - """Demonstrate a successful saga execution.""" + """ + Run an example order-processing saga and print the per-step progress and final results. + + Sets up mock services, dependency injection, and in-memory saga storage; executes the OrderSaga with a generated saga ID, prints each completed step, then prints the final saga status, context fields (inventory reservation, payment ID, shipment ID) and the persisted execution log. If saga execution fails, the failure is printed and the exception is re-raised. + """ print("\n" + "=" * 70) print("SCENARIO 1: Successful Order Processing Saga") print("=" * 70) @@ -736,4 +753,4 @@ async def main() -> None: if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/examples/saga_fallback.py b/examples/saga_fallback.py index 407b162..dd95fa8 100644 --- a/examples/saga_fallback.py +++ b/examples/saga_fallback.py @@ -144,7 +144,17 @@ async def act( self, context: OrderContext, ) -> SagaStepResult[OrderContext, ReserveInventoryResponse]: - """Primary step that always raises an error.""" + """ + Simulate a failing primary reservation step for the saga. + + This action always raises a RuntimeError to emulate an unavailable downstream service and trigger fallback or compensation behavior. + + Parameters: + context (OrderContext): Shared saga context containing order details (e.g., order_id, user_id, amount, reservation_id). + + Raises: + RuntimeError: Indicates the primary step failed (service unavailable). + """ self._call_count += 1 logger.info( f" [PrimaryStep] Executing act() for order {context.order_id} " f"(call #{self._call_count})...", @@ -271,7 +281,11 @@ async def run_saga( async def main() -> None: - """Run saga fallback example.""" + """ + Run an interactive demonstration of the saga fallback pattern with a circuit breaker. + + Executes three scenarios that show a failing primary step with an automatic fallback, the circuit breaker opening after a configurable number of failures, and fail-fast behavior when the circuit is open. Also conditionally demonstrates configuring a Redis-backed circuit breaker storage, prints per-scenario results and a summary, and informs about missing optional dependencies. + """ print("\n" + "=" * 80) print("SAGA FALLBACK PATTERN WITH CIRCUIT BREAKER EXAMPLE") print("=" * 80) @@ -419,4 +433,4 @@ class OrderSagaWithRedisBreaker(Saga[OrderContext]): if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/examples/saga_recovery.py b/examples/saga_recovery.py index 72dfd7b..3400c94 100644 --- a/examples/saga_recovery.py +++ b/examples/saga_recovery.py @@ -281,7 +281,20 @@ async def create_shipment( items: list[str], address: str, ) -> tuple[str, str]: - """Create a shipment for the order.""" + """ + Create a shipment record for the given order and generate a tracking number. + + Parameters: + order_id (str): Identifier of the order. + items (list[str]): Items included in the shipment. + address (str): Destination shipping address. + + Returns: + tuple[str, str]: A tuple containing the shipment ID and the tracking number. + + Raises: + ValueError: If `address` is empty. + """ if not address: raise ValueError("Shipping address is required") @@ -519,16 +532,12 @@ async def resolve(self, type_: type) -> typing.Any: async def simulate_interrupted_saga() -> tuple[uuid.UUID, MemorySagaStorage]: """ - Simulate a saga that gets interrupted after the first step. - - This simulates what happens when: - - Server crashes after completing ReserveInventoryStep - - Network timeout occurs - - Process is killed during execution - - Database connection is lost - + Simulate a saga that is interrupted after the inventory reservation step to produce a recoverable persisted state. + Returns: - Tuple of (saga_id, storage) so we can recover it later. + tuple: + saga_id (uuid.UUID): Identifier of the created saga. + storage (MemorySagaStorage): In-memory storage containing the persisted saga state and step history for recovery. """ print("\n" + "=" * 70) print("SCENARIO 1: Simulating Interrupted Saga") @@ -622,13 +631,13 @@ async def recover_interrupted_saga( storage: MemorySagaStorage, ) -> None: """ - Recover and complete the interrupted saga. - - This demonstrates how recovery ensures eventual consistency by: - 1. Loading saga state from storage - 2. Reconstructing context - 3. Resuming execution from last completed step - 4. Completing remaining steps + Recover and complete an interrupted saga using persisted state. + + Loads the saga state from storage, reconstructs the saga context, resumes execution from the last completed step, and completes any remaining steps to restore eventual consistency. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga instance to recover. + storage (MemorySagaStorage): Durable storage containing the saga's persisted state and step history. """ print("\n" + "=" * 70) print("SCENARIO 2: Recovering Interrupted Saga") @@ -688,10 +697,12 @@ async def recover_interrupted_saga( async def simulate_interrupted_compensation() -> tuple[uuid.UUID, MemorySagaStorage]: """ - Simulate a saga that fails and gets interrupted during compensation. - - This shows recovery of compensation logic, which is critical for - maintaining consistency when rollback is interrupted. + Simulate a saga that fails and is interrupted during compensation. + + Sets up services, a saga, and a failing shipment step to trigger compensation that is then artificially interrupted; returns identifiers and storage state for performing recovery in a separate run. + + Returns: + tuple[uuid.UUID, MemorySagaStorage]: The saga ID and the in-memory storage containing the persisted saga state and step history after the simulated interruption. """ print("\n" + "=" * 70) print("SCENARIO 3: Simulating Interrupted Compensation") @@ -801,10 +812,13 @@ async def recover_interrupted_compensation( storage: MemorySagaStorage, ) -> None: """ - Recover and complete the interrupted compensation. - - This ensures that even if compensation is interrupted, it will - eventually complete, releasing all resources. + Recover and complete an interrupted compensation for a saga. + + Loads the saga state from the provided storage using the given saga identifier and drives any incomplete compensation steps to completion, ensuring resources (inventory, payments, shipments) are released and the system reaches a consistent state. Progress and final status are printed to stdout. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to recover. + storage (MemorySagaStorage): Persistent storage containing the saga state and step history. """ print("\n" + "=" * 70) print("SCENARIO 4: Recovering Interrupted Compensation") @@ -910,4 +924,4 @@ async def main() -> None: if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/examples/saga_recovery_scheduler.py b/examples/saga_recovery_scheduler.py index a3f3103..e36d4f6 100644 --- a/examples/saga_recovery_scheduler.py +++ b/examples/saga_recovery_scheduler.py @@ -620,7 +620,11 @@ async def create_interrupted_saga(storage: MemorySagaStorage) -> uuid.UUID: async def main() -> None: - """Run the recovery scheduler example.""" + """ + Run the saga recovery scheduler demo and display its outcome. + + Sets up an in-memory saga storage, creates a simulated interrupted saga and marks it stale, runs the recovery loop for three iterations (using the module's recovery_loop and recovery configuration constants), then loads and prints the final saga state. + """ print("\n" + "=" * 70) print("SAGA RECOVERY SCHEDULER EXAMPLE") print("=" * 70) @@ -660,4 +664,4 @@ async def main() -> None: if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/examples/saga_sqlalchemy_storage.py b/examples/saga_sqlalchemy_storage.py index 2daa543..d600026 100644 --- a/examples/saga_sqlalchemy_storage.py +++ b/examples/saga_sqlalchemy_storage.py @@ -142,6 +142,11 @@ async def setup_database(engine: AsyncEngine) -> None: async def main() -> None: # 1. Create SQLAlchemy Engine with Connection Pool # SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite) + """ + Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage. + + Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine. + """ engine = create_async_engine( DB_URL, echo=False, # Set to True to see SQL queries @@ -202,4 +207,4 @@ def saga_mapper(mapper: cqrs.SagaMap) -> None: if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/src/cqrs/saga/compensation.py b/src/cqrs/saga/compensation.py index 6b0280a..53dd1ec 100644 --- a/src/cqrs/saga/compensation.py +++ b/src/cqrs/saga/compensation.py @@ -26,17 +26,16 @@ def __init__( on_after_compensate_step: typing.Callable[[], typing.Awaitable[None]] | None = None, ) -> None: """ - Initialize compensator. - - Args: - saga_id: UUID of the saga - context: Saga context - storage: Saga storage implementation (or run object with same interface) - retry_count: Number of retry attempts for compensation - retry_delay: Initial delay between retries in seconds - retry_backoff: Backoff multiplier for exponential delay - on_after_compensate_step: Optional async callback after each successfully - compensated step (e.g. run.commit() for checkpoint). + Create a SagaCompensator configured to perform compensation of completed saga steps with retry and optional post-step callback. + + Parameters: + saga_id: Identifier of the saga. + context: Saga execution context passed to step compensation handlers. + storage: Storage or run object implementing saga persistence operations. + retry_count: Maximum number of attempts per step before giving up. + retry_delay: Initial delay in seconds before the first retry. + retry_backoff: Multiplier applied to the delay for each successive retry (exponential backoff). + on_after_compensate_step: Optional async callback invoked after each step is successfully compensated. """ self._saga_id = saga_id self._context = context @@ -51,10 +50,15 @@ async def compensate_steps( completed_steps: list[SagaStepHandler[ContextT, typing.Any]], ) -> None: """ - Compensate all completed steps in reverse order with retry mechanism. - - Args: - completed_steps: List of completed step handlers to compensate + Compensates completed saga steps in reverse order, applying retry logic and recording step statuses. + + Compensates each handler from last to first, skipping steps already recorded as compensated in the saga history. Updates the saga status to COMPENSATING at the start and logs per-step statuses (STARTED, COMPLETED, FAILED) in storage. After a step completes, the optional on_after_compensate_step callback (if provided) is awaited. If any step fails after all retry attempts, the saga is marked as FAILED. If no completed steps are provided, no compensation is attempted and the saga is marked as FAILED. + + Parameters: + completed_steps (list[SagaStepHandler[ContextT, typing.Any]]): Handlers corresponding to steps that completed during the saga; these will be compensated in reverse order. + + Returns: + None """ await self._storage.update_status(self._saga_id, SagaStatus.COMPENSATING) @@ -170,4 +174,4 @@ async def _compensate_step_with_retry( # If we get here, all retries failed if last_exception: - raise last_exception + raise last_exception \ No newline at end of file diff --git a/src/cqrs/saga/execution.py b/src/cqrs/saga/execution.py index d5ca0f7..84a63bf 100644 --- a/src/cqrs/saga/execution.py +++ b/src/cqrs/saga/execution.py @@ -24,11 +24,11 @@ def __init__( storage: ISagaStorage | SagaStorageRun, ) -> None: """ - Initialize state manager. - - Args: - saga_id: UUID of the saga - storage: Saga storage implementation + Create a SagaStateManager bound to a specific saga identifier and storage backend. + + Parameters: + saga_id: Identifier for the saga instance. + storage: Storage backend implementing ISagaStorage or SagaStorageRun used to persist saga state and history. """ self._saga_id = saga_id self._storage = storage @@ -84,13 +84,13 @@ def __init__( saga_steps: list[type[SagaStepHandler] | Fallback], ) -> None: """ - Initialize recovery manager. - - Args: - saga_id: UUID of the saga - storage: Saga storage implementation - container: DI container for resolving step handlers - saga_steps: List of saga steps + Construct a SagaRecoveryManager that holds the identifiers, storage, DI container, and configured saga steps required to reconstruct a saga's execution state. + + Parameters: + saga_id: Identifier for the saga instance (e.g., UUID or other unique value). + storage: Persistence backend implementing saga history operations (ISagaStorage or SagaStorageRun). + container: Dependency injection container used to resolve step handler instances. + saga_steps: Ordered list of saga step types or Fallback wrappers that define the saga's execution sequence. """ self._saga_id = saga_id self._storage = storage @@ -99,10 +99,10 @@ def __init__( async def load_completed_step_names(self) -> set[str]: """ - Load set of completed step names from history. - + Return the names of saga steps that completed their primary ("act") action. + Returns: - Set of step names that have been completed + set[str]: Step names recorded with status `SagaStepStatus.COMPLETED` and action `"act"`. """ history = await self._storage.get_step_history(self._saga_id) return {e.step_name for e in history if e.status == SagaStepStatus.COMPLETED and e.action == "act"} @@ -112,13 +112,13 @@ async def reconstruct_completed_steps( completed_step_names: set[str], ) -> list[SagaStepHandler[SagaContext, typing.Any]]: """ - Reconstruct list of completed step handlers from history. - - Args: - completed_step_names: Set of completed step names - + Reconstructs and returns the resolved step handler instances corresponding to the completed steps, preserving saga execution order. + + Parameters: + completed_step_names (set[str]): Names of steps that completed the "act" action. + Returns: - List of step handlers in execution order + list[SagaStepHandler[SagaContext, typing.Any]]: Resolved step handler instances in execution order. For Fallback wrappers, the primary handler is chosen if its name appears in completed_step_names; otherwise the fallback handler is chosen when present. """ completed_steps: list[SagaStepHandler[SagaContext, typing.Any]] = [] @@ -365,4 +365,4 @@ async def execute_fallback_step( raise fallback_error else: # Should not fallback, re-raise original error - raise primary_error + raise primary_error \ No newline at end of file diff --git a/src/cqrs/saga/saga.py b/src/cqrs/saga/saga.py index a461ea4..2edd336 100644 --- a/src/cqrs/saga/saga.py +++ b/src/cqrs/saga/saga.py @@ -154,14 +154,10 @@ async def __aiter__( ) -> typing.AsyncIterator[SagaStepResult[ContextT, typing.Any]]: """ Execute saga steps sequentially and yield each step result. - - This method implements the "Strict Backward Recovery" strategy for saga execution. - Once a saga enters COMPENSATING or FAILED status, it can never proceed forward. - This ensures data consistency and prevents "zombie states" where a saga is - partially compensated and partially executed. - - When storage supports create_run(), uses one session per saga and checkpoint - commits (fewer commits and sessions). Otherwise uses the legacy path. + + Implements the Strict Backward Recovery strategy: if the saga is in COMPENSATING or FAILED status, forward execution is never resumed. When the underlying storage provides create_run(), execution is performed within a per-saga run with checkpoint commits; otherwise the legacy run-less path is used. + Returns: + AsyncIterator[SagaStepResult[ContextT, typing.Any]]: An async iterator that yields the result for each executed saga step in order. """ try: run_cm = self._storage.create_run() @@ -180,7 +176,18 @@ async def _execute( self, run: SagaStorageRun | None, ) -> typing.AsyncIterator[SagaStepResult[ContextT, typing.Any]]: - """Run saga steps; use run for storage when provided and commit at checkpoints.""" + """ + Execute the saga's configured steps, using the provided storage run for checkpointed operations when available, and perform recovery and compensation as required. + + Parameters: + run (SagaStorageRun | None): Optional per-saga storage run. When provided, the run is used for loading saga state, creating run-scoped managers/executors, and committing at checkpoint boundaries. When None, the transaction's internal managers and executors are used. + + Returns: + Async iterator that yields SagaStepResult values for each step that completes; each yielded result will include the transaction's saga_id. + + Raises: + RuntimeError: If the saga was recovered in COMPENSATING or FAILED state and compensation was completed, forward execution is not allowed. + """ if run is not None: state_manager = SagaStateManager(self._saga_id, run) recovery_manager = SagaRecoveryManager( @@ -352,7 +359,11 @@ async def _execute( raise async def _compensate(self) -> None: - """Compensate all completed steps in reverse order with retry mechanism.""" + """ + Mark the transaction as compensated and run compensation for all completed steps in reverse order. + + Sets an internal flag to prevent repeated compensation and delegates to the compensator which applies the configured retry behavior. + """ # Prevent double compensation if self._compensated: return @@ -479,4 +490,4 @@ def transaction( compensation_retry_count=compensation_retry_count, compensation_retry_delay=compensation_retry_delay, compensation_retry_backoff=compensation_retry_backoff, - ) + ) \ No newline at end of file diff --git a/src/cqrs/saga/storage/memory.py b/src/cqrs/saga/storage/memory.py index 1c58e5e..a1ec80f 100644 --- a/src/cqrs/saga/storage/memory.py +++ b/src/cqrs/saga/storage/memory.py @@ -16,6 +16,12 @@ class _MemorySagaStorageRun(SagaStorageRun): """Run that delegates to the underlying MemorySagaStorage; commit/rollback are no-ops.""" def __init__(self, storage: "MemorySagaStorage") -> None: + """ + Initialize the run and bind it to the provided MemorySagaStorage. + + Parameters: + storage (MemorySagaStorage): Underlying in-memory storage instance used to delegate saga operations. + """ self._storage = storage async def create_saga( @@ -24,6 +30,17 @@ async def create_saga( name: str, context: dict[str, typing.Any], ) -> None: + """ + Create a new saga entry in the underlying memory storage. + + Parameters: + saga_id (uuid.UUID): Unique identifier for the saga. + name (str): Human-readable saga name. + context (dict[str, typing.Any]): Initial saga context payload. + + Raises: + ValueError: If a saga with the same `saga_id` already exists. + """ await self._storage.create_saga(saga_id, name, context) async def update_context( @@ -32,6 +49,18 @@ async def update_context( context: dict[str, typing.Any], current_version: int | None = None, ) -> None: + """ + Update the stored context for the given saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose context will be updated. + context (dict[str, typing.Any]): New context to store for the saga. + current_version (int | None): If provided, require the stored saga version to match this value (optimistic locking). + + Raises: + ValueError: If the saga_id does not exist. + SagaConcurrencyError: If current_version is provided and does not match the stored version. + """ await self._storage.update_context(saga_id, context, current_version) async def update_status( @@ -39,6 +68,16 @@ async def update_status( saga_id: uuid.UUID, status: SagaStatus, ) -> None: + """ + Update the stored status of the saga identified by `saga_id`. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to update. + status (SagaStatus): New status to set for the saga. + + Raises: + ValueError: If no saga exists with the given `saga_id`. + """ await self._storage.update_status(saga_id, status) async def log_step( @@ -49,6 +88,16 @@ async def log_step( status: SagaStepStatus, details: str | None = None, ) -> None: + """ + Log a step entry for the given saga into the underlying storage. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga. + step_name (str): Name of the saga step. + action (Literal["act", "compensate"]): Whether the step is a forward action ("act") or a compensation ("compensate"). + status (SagaStepStatus): Outcome status of the step. + details (str | None): Optional free-form details or metadata about the step. + """ await self._storage.log_step( saga_id, step_name, @@ -63,6 +112,16 @@ async def load_saga_state( *, read_for_update: bool = False, ) -> tuple[SagaStatus, dict[str, typing.Any], int]: + """ + Load the current state for a saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to load. + read_for_update (bool): If True, acquire the state for update (may be used for optimistic locking or exclusive access). + + Returns: + tuple[SagaStatus, dict[str, typing.Any], int]: A tuple containing the saga's status, its context dictionary, and the current version number. + """ return await self._storage.load_saga_state( saga_id, read_for_update=read_for_update, @@ -72,12 +131,29 @@ async def get_step_history( self, saga_id: uuid.UUID, ) -> list[SagaLogEntry]: + """ + Retrieve the step log/history for a saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose step history is requested. + + Returns: + list[SagaLogEntry]: Saga step log entries sorted by timestamp in ascending order (oldest first). Returns an empty list if no logs exist. + """ return await self._storage.get_step_history(saga_id) async def commit(self) -> None: + """ + No-op commit for an in-memory saga run; provided to satisfy the SagaStorageRun interface. + + This method intentionally performs no action because the memory storage does not require an explicit commit. + """ pass async def rollback(self) -> None: + """ + Perform no action for rollback; provided to satisfy the SagaStorageRun interface. + """ pass @@ -86,6 +162,13 @@ class MemorySagaStorage(ISagaStorage): def __init__(self) -> None: # Structure: {saga_id: {name, status, context, created_at, updated_at, version}} + """ + Initialize in-memory storage for sagas and their step logs. + + Creates two internal mappings: + - _sagas: maps saga_id (UUID) to a dictionary containing keys `name`, `status`, `context`, `created_at`, `updated_at`, and `version`. + - _logs: maps saga_id (UUID) to a list of SagaLogEntry objects representing the saga's step history. + """ self._sagas: dict[uuid.UUID, dict[str, typing.Any]] = {} # Structure: {saga_id: [SagaLogEntry, ...]} self._logs: dict[uuid.UUID, list[SagaLogEntry]] = {} @@ -93,6 +176,12 @@ def __init__(self) -> None: def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: + """ + Provide an asynchronous context manager that yields a SagaStorageRun bound to this storage. + + Returns: + An asynchronous context manager that yields a `SagaStorageRun` instance backed by this `MemorySagaStorage`. + """ @contextlib.asynccontextmanager async def _run() -> typing.AsyncGenerator[SagaStorageRun, None]: yield _MemorySagaStorageRun(self) @@ -105,6 +194,17 @@ async def create_saga( name: str, context: dict[str, typing.Any], ) -> None: + """ + Create a new saga record in the in-memory store. + + Parameters: + saga_id (uuid.UUID): Identifier for the saga; must not already exist. + name (str): Human-readable name for the saga. + context (dict[str, typing.Any]): Initial context payload for the saga. + + Raises: + ValueError: If a saga with `saga_id` already exists. + """ if saga_id in self._sagas: raise ValueError(f"Saga {saga_id} already exists") @@ -203,6 +303,18 @@ async def get_sagas_for_recovery( stale_after_seconds: int | None = None, saga_name: str | None = None, ) -> list[uuid.UUID]: + """ + Selects saga IDs eligible for recovery based on status, recovery attempts, staleness, and an optional name filter. + + Parameters: + limit (int): Maximum number of saga IDs to return. + max_recovery_attempts (int): Upper bound (exclusive) on recovery attempts; only sagas with fewer attempts are considered. + stale_after_seconds (int | None): If provided, only sagas last updated earlier than this many seconds before now are considered; if None, staleness is ignored. + saga_name (str | None): If provided, only sagas with this name are considered; if None, name is not filtered. + + Returns: + list[uuid.UUID]: Up to `limit` saga IDs sorted by oldest `updated_at` first that match the recovery criteria. + """ recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING) now = datetime.datetime.now(datetime.timezone.utc) threshold = (now - datetime.timedelta(seconds=stale_after_seconds)) if stale_after_seconds is not None else None @@ -241,4 +353,4 @@ async def set_recovery_attempts( data = self._sagas[saga_id] data["recovery_attempts"] = attempts data["updated_at"] = datetime.datetime.now(datetime.timezone.utc) - data["version"] += 1 + data["version"] += 1 \ No newline at end of file diff --git a/src/cqrs/saga/storage/protocol.py b/src/cqrs/saga/storage/protocol.py index e80faaf..701259c 100644 --- a/src/cqrs/saga/storage/protocol.py +++ b/src/cqrs/saga/storage/protocol.py @@ -19,18 +19,48 @@ async def create_saga( saga_id: uuid.UUID, name: str, context: dict[str, typing.Any], - ) -> None: ... + ) -> None: """ + Create a new saga execution record with initial PENDING status and version 1. + + Parameters: + saga_id (uuid.UUID): Unique identifier for the saga (primary key). + name (str): Human-friendly name used for diagnostics and filtering. + context (dict[str, Any]): JSON-serializable initial saga context to persist. + """ + ... async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], current_version: int | None = None, - ) -> None: ... + ) -> None: """ + Persist a snapshot of the saga's execution context, optionally using optimistic locking. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to update. + context (dict[str, Any]): JSON-serializable context object to store as the new snapshot. + current_version (int | None): If provided, perform an optimistic-locking update that succeeds only + if the stored version matches this value; on success the stored version is incremented. + + Raises: + SagaConcurrencyError: If `current_version` is provided and does not match the stored version. + """ + ... async def update_status( self, saga_id: uuid.UUID, status: SagaStatus, - ) -> None: ... + ) -> None: """ + Set the global status for the saga identified by `saga_id`. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to update. + status (SagaStatus): New global status to persist (for example RUNNING, COMPLETED, COMPENSATING). + + Notes: + This operation does not commit the storage session; the caller must call `commit()` on the active run or session to persist the change. + """ + ... async def log_step( self, saga_id: uuid.UUID, @@ -38,19 +68,58 @@ async def log_step( action: typing.Literal["act", "compensate"], status: SagaStepStatus, details: str | None = None, - ) -> None: ... + ) -> None: """ + Append a step transition to the saga's execution log. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose log will be appended. + step_name (str): Logical name of the step (used for diagnostics and replay). + action (Literal["act", "compensate"]): Whether this entry records the primary action ("act") or its compensating action ("compensate"). + status (SagaStepStatus): The step transition status to record (e.g., started, completed, failed, compensated). + details (str | None): Optional human-readable details or diagnostics about the transition. + """ + ... async def load_saga_state( self, saga_id: uuid.UUID, *, read_for_update: bool = False, - ) -> tuple[SagaStatus, dict[str, typing.Any], int]: ... + ) -> tuple[SagaStatus, dict[str, typing.Any], int]: """ + Load the current saga execution state. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to load. + read_for_update (bool): If True, acquire a database lock for update to prevent concurrent modifications. + + Returns: + tuple[SagaStatus, dict[str, Any], int]: A tuple containing the saga's global status, the latest persisted context (JSON-serializable), and the current optimistic-locking version number. + """ + ... async def get_step_history( self, saga_id: uuid.UUID, - ) -> list[SagaLogEntry]: ... - async def commit(self) -> None: ... - async def rollback(self) -> None: ... + ) -> list[SagaLogEntry]: """ + Retrieve the chronological step log for a saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose step history to retrieve. + + Returns: + list[SagaLogEntry]: Ordered list of step log entries for the saga, from oldest to newest. + """ + ... + async def commit(self) -> None: """ +Finalize the storage run by persisting and committing all pending changes made during this session. + +This method makes the run's checkpointed changes durable; the caller is responsible for invoking commit at logical checkpoints to persist session state. +""" +... + async def rollback(self) -> None: """ +Abort the current storage run and revert any uncommitted changes in the session. + +This releases the run's transactional state without persisting pending updates so that the storage remains as it was before the run began. +""" +... class ISagaStorage(abc.ABC): @@ -265,19 +334,15 @@ async def set_recovery_attempts( def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: - """Create a scoped run (one session) for saga execution with checkpoint commits. - - Optional. When implemented, the orchestrator uses one session per saga - and calls commit() only at checkpoints (after create+RUNNING, after each - step, at completion, during compensation). Reduces commits and sessions. - The yielded object has the same mutation/read methods as this storage - but does not commit; caller must call commit() or rollback(). - + """ + Create a scoped async run context for a single saga execution session with checkpointed commits. + + The context manager yields a SagaStorageRun that provides the same mutation/read methods as the storage but does not commit automatically; the caller must call commit() or rollback() at desired checkpoints. + Returns: - Async context manager yielding a run object (SagaStorageRun). - Session is never exposed outside the storage implementation. - + contextlib.AbstractAsyncContextManager[SagaStorageRun]: Async context manager yielding a SagaStorageRun session. + Raises: - NotImplementedError: If this storage does not support scoped runs. + NotImplementedError: If the storage backend does not support scoped runs. """ - raise NotImplementedError("This storage does not support create_run()") + raise NotImplementedError("This storage does not support create_run()") \ No newline at end of file diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 30ea014..b85c870 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -137,6 +137,12 @@ class _SqlAlchemySagaStorageRun(SagaStorageRun): """Scoped run: one session, no commit inside methods; caller calls commit().""" def __init__(self, session: AsyncSession) -> None: + """ + Initialize the run wrapper with an async SQLAlchemy session. + + Parameters: + session (AsyncSession): The AsyncSession instance scoped to this run, used for all database operations. + """ self._session = session async def create_saga( @@ -145,6 +151,16 @@ async def create_saga( name: str, context: dict[str, typing.Any], ) -> None: + """ + Create and stage a new saga execution record in the current session with initial metadata. + + Creates a SagaExecutionModel for the given saga identifier with status set to PENDING, version set to 1, and recovery_attempts set to 0, and adds it to the active session without committing the transaction. + + Parameters: + saga_id (uuid.UUID): Unique identifier for the saga execution. + name (str): Human-readable name of the saga. + context (dict[str, Any]): Initial saga context to be stored (will be serialized to the model's JSON column). + """ execution = SagaExecutionModel( id=saga_id, name=name, @@ -161,6 +177,17 @@ async def update_context( context: dict[str, typing.Any], current_version: int | None = None, ) -> None: + """ + Update the stored context for a saga and increment its version, optionally enforcing an optimistic version check. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to update. + context (dict[str, typing.Any]): New serialized saga context to persist. + current_version (int | None): If provided, require the saga's current version to match this value before updating. + + Raises: + SagaConcurrencyError: If an optimistic version check fails (indicating a concurrent modification) or if the saga does not exist when a version was supplied. + """ stmt = sqlalchemy.update(SagaExecutionModel).where( SagaExecutionModel.id == saga_id, ) @@ -195,6 +222,16 @@ async def update_status( saga_id: uuid.UUID, status: SagaStatus, ) -> None: + """ + Update the stored status of a saga execution and increment its optimistic-lock version. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga execution to update. + status (SagaStatus): New status to set for the saga. + + Note: + The update is executed in the active database session; a commit is required to persist the change. + """ await self._session.execute( sqlalchemy.update(SagaExecutionModel) .where(SagaExecutionModel.id == saga_id) @@ -212,6 +249,16 @@ async def log_step( status: SagaStepStatus, details: str | None = None, ) -> None: + """ + Record a saga step event by creating and staging a log entry in the active session. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga execution. + step_name (str): Name of the step being recorded. + action (Literal["act", "compensate"]): The performed action: "act" for normal action or "compensate" for compensation. + status (SagaStepStatus): The step's outcome status. + details (str | None): Optional free-form details or error message associated with the step. + """ log_entry = SagaLogModel( saga_id=saga_id, step_name=step_name, @@ -227,6 +274,19 @@ async def load_saga_state( *, read_for_update: bool = False, ) -> tuple[SagaStatus, dict[str, typing.Any], int]: + """ + Load the current execution state for a saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to load. + read_for_update (bool): If true, acquire a row-level lock for update. + + Returns: + tuple[SagaStatus, dict[str, Any], int]: The saga's status, its context dictionary, and the current version. + + Raises: + ValueError: If no saga with the given id exists. + """ stmt = sqlalchemy.select(SagaExecutionModel).where( SagaExecutionModel.id == saga_id, ) @@ -248,6 +308,16 @@ async def get_step_history( self, saga_id: uuid.UUID, ) -> list[SagaLogEntry]: + """ + Retrieve chronological step log entries for the given saga. + + Parameters: + saga_id (uuid.UUID): UUID of the saga whose step history to fetch. + + Returns: + list[SagaLogEntry]: List of log entries ordered by creation time. Each entry's `timestamp` + is normalized to UTC if not already timezone-aware. + """ result = await self._session.execute( sqlalchemy.select(SagaLogModel).where(SagaLogModel.saga_id == saga_id).order_by(SagaLogModel.created_at), ) @@ -270,19 +340,42 @@ async def get_step_history( ] async def commit(self) -> None: + """ + Commit the current transaction in the associated AsyncSession. + """ await self._session.commit() async def rollback(self) -> None: + """ + Revert all staged changes in the current session's transaction. + + This aborts the in-progress transaction associated with the run's AsyncSession, + discarding any pending writes or flushes. + """ await self._session.rollback() class SqlAlchemySagaStorage(ISagaStorage): def __init__(self, session_factory: async_sessionmaker[AsyncSession]): + """ + Initialize the SQLAlchemy-based saga storage with a factory for creating async sessions. + + Parameters: + session_factory (async_sessionmaker[AsyncSession]): Factory that produces new AsyncSession instances used for each storage run and operation. + """ self.session_factory = session_factory def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: + """ + Create a scoped run that yields a SagaStorageRun bound to a fresh session. + + The returned context manager provides a run object whose lifecycle is tied to a single session. If an exception is raised inside the context, the run's transaction is rolled back; the session is always closed on exit. + + Returns: + A context manager that yields a `SagaStorageRun`. On exception within the context, the run's `rollback()` is invoked and the session is closed when the context exits. + """ @contextlib.asynccontextmanager async def _run() -> typing.AsyncGenerator[SagaStorageRun, None]: async with self.session_factory() as session: @@ -301,6 +394,20 @@ async def create_saga( name: str, context: dict[str, typing.Any], ) -> None: + """ + Create and persist a new saga execution record with initial metadata. + + Creates a SagaExecutionModel for the given saga_id and name, sets status to PENDING, + version to 1, and recovery_attempts to 0, and commits it to the database. + + Parameters: + saga_id (uuid.UUID): Unique identifier for the saga execution. + name (str): Human-readable saga name. + context (dict[str, typing.Any]): Initial saga context to store. + + Raises: + SQLAlchemyError: If the database operation fails; the transaction is rolled back before the exception is propagated. + """ async with self.session_factory() as session: try: execution = SagaExecutionModel( @@ -507,6 +614,17 @@ async def increment_recovery_attempts( saga_id: uuid.UUID, new_status: SagaStatus | None = None, ) -> None: + """ + Increment the recovery attempts counter for the given saga execution and optionally update its status. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga execution to update. + new_status (SagaStatus | None): If provided, set the saga's status to this value. + + Raises: + ValueError: If no saga execution exists with the given `saga_id`. + SQLAlchemyError: On database errors; the transaction is rolled back and the error is propagated. + """ async with self.session_factory() as session: try: values: dict[str, typing.Any] = { @@ -545,4 +663,4 @@ async def set_recovery_attempts( await session.commit() except SQLAlchemyError: await session.rollback() - raise + raise \ No newline at end of file diff --git a/tests/benchmarks/dataclasses/test_benchmark_saga_memory.py b/tests/benchmarks/dataclasses/test_benchmark_saga_memory.py index 814c29a..a236c56 100644 --- a/tests/benchmarks/dataclasses/test_benchmark_saga_memory.py +++ b/tests/benchmarks/dataclasses/test_benchmark_saga_memory.py @@ -144,6 +144,12 @@ def saga_container() -> SagaContainer: @pytest.fixture def memory_storage() -> MemorySagaStorage: + """ + Provide a fresh in-memory saga storage instance for tests and benchmarks. + + Returns: + MemorySagaStorage: A new MemorySagaStorage instance. + """ return MemorySagaStorage() @@ -153,11 +159,23 @@ class MemorySagaStorageLegacy(MemorySagaStorage): def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: + """ + Indicate that creating a scoped run context is not supported for the legacy in-memory storage used in benchmarks. + + Raises: + NotImplementedError: Always raised with message "Legacy storage: create_run disabled for benchmark". + """ raise NotImplementedError("Legacy storage: create_run disabled for benchmark") @pytest.fixture def memory_storage_legacy() -> MemorySagaStorageLegacy: + """ + Provide a legacy in-memory saga storage that does not support scoped runs. + + Returns: + MemorySagaStorageLegacy: An in-memory saga storage whose `create_run` is disabled (raises `NotImplementedError`) for legacy-path benchmarks. + """ return MemorySagaStorageLegacy() @@ -166,6 +184,18 @@ def saga_with_memory_storage( saga_container: SagaContainer, memory_storage: MemorySagaStorage, ) -> Saga[OrderContext]: + """ + Create an OrderSaga configured with reserve-inventory, process-payment, and ship-order steps. + + This factory accepts the saga container and memory storage as fixture dependencies (they are not used by this function) and returns a Saga subclass instance with the three ordered step handlers: ReserveInventoryStep, ProcessPaymentStep, and ShipOrderStep. + + Parameters: + saga_container (SagaContainer): Fixture-provided container (unused). + memory_storage (MemorySagaStorage): Fixture-provided memory storage (unused). + + Returns: + Saga[OrderContext]: An OrderSaga instance wired with the predefined steps. + """ class OrderSaga(Saga[OrderContext]): steps = [ReserveInventoryStep, ProcessPaymentStep, ShipOrderStep] @@ -182,6 +212,11 @@ def test_benchmark_saga_memory_run_full_transaction( """Benchmark full saga transaction with memory storage, scoped run (3 steps).""" async def run() -> None: + """ + Execute a full saga transaction using the module's memory-backed saga, advancing through every step. + + Creates an OrderContext with order_id "ord_1", user_id "user_1", and amount 100.0, opens a transaction using the provided saga container and memory storage, and iterates the transaction to exercise each step in the run path. + """ context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async with saga_with_memory_storage.transaction( context=context, @@ -231,6 +266,11 @@ def test_benchmark_saga_memory_run_ten_transactions( """Benchmark 10 saga transactions in sequence, scoped run (memory storage).""" async def run() -> None: + """ + Execute 10 sequential saga transactions using a fresh in-memory storage and context for each iteration. + + Each iteration creates a new MemorySagaStorage and OrderContext, opens a saga transaction with the provided container and storage, and iterates through the transaction steps without performing additional work. + """ for i in range(10): storage = MemorySagaStorage() context = OrderContext( @@ -262,6 +302,11 @@ def test_benchmark_saga_memory_legacy_full_transaction( """Benchmark full saga transaction with memory storage, legacy path (3 steps).""" async def run() -> None: + """ + Run a full-order saga transaction against the legacy in-memory storage used for benchmarks. + + Creates an OrderContext and executes the saga transaction using the provided saga container and legacy memory storage, iterating the transaction to completion. + """ context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async with saga_with_memory_storage.transaction( context=context, @@ -289,6 +334,11 @@ class SingleStepSaga(Saga[OrderContext]): saga = SingleStepSaga() async def run() -> None: + """ + Run a saga transaction using the legacy memory storage and iterate its steps. + + Enters a transaction for an OrderContext (order_id "ord_1") with the registered saga_container and memory_storage_legacy, then iterates through the transaction steps without performing work for each step. + """ context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async with saga.transaction( context=context, @@ -310,6 +360,11 @@ def test_benchmark_saga_memory_legacy_ten_transactions( """Benchmark 10 saga transactions in sequence, legacy path (memory storage).""" async def run() -> None: + """ + Execute ten sequential saga transactions using MemorySagaStorageLegacy. + + Each iteration creates a new MemorySagaStorageLegacy and an OrderContext (with distinct order_id, user_id, and amount) and runs the configured saga transaction to completion by iterating through its steps. + """ for i in range(10): storage = MemorySagaStorageLegacy() context = OrderContext( @@ -325,4 +380,4 @@ async def run() -> None: async for _ in transaction: pass - benchmark(lambda: asyncio.run(run())) + benchmark(lambda: asyncio.run(run())) \ No newline at end of file diff --git a/tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py b/tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py index 0d6b432..8e43f7a 100644 --- a/tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py +++ b/tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py @@ -28,11 +28,23 @@ class SqlAlchemySagaStorageLegacy(SqlAlchemySagaStorage): def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: + """ + Disable creation of a scoped SagaStorageRun for legacy storage used in benchmarks. + + Raises: + NotImplementedError: Always raised with message "Legacy storage: create_run disabled for benchmark". + """ raise NotImplementedError("Legacy storage: create_run disabled for benchmark") @pytest.fixture def saga_container() -> SagaContainer: + """ + Create and return a SagaContainer pre-registered with the ReserveInventoryStep, ProcessPaymentStep, and ShipOrderStep instances. + + Returns: + SagaContainer: A container with the three steps already registered. + """ container = SagaContainer() container.register(ReserveInventoryStep, ReserveInventoryStep()) container.register(ProcessPaymentStep, ProcessPaymentStep()) @@ -103,6 +115,11 @@ class SingleStepSaga(Saga[OrderContext]): context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: + """ + Execute the saga transaction lifecycle by entering the saga's transaction context and iterating its steps to completion. + + This function opens the saga transaction using the surrounding `saga`, `saga_container`, `storage`, and `context`, then consumes the transaction iterator to drive all saga steps to completion. + """ async with saga.transaction( context=context, container=saga_container, @@ -124,7 +141,11 @@ def test_benchmark_saga_sqlalchemy_legacy_full_transaction( saga_container: SagaContainer, saga_benchmark_loop_and_engine, ): - """Benchmark full saga transaction with SQLAlchemy storage, legacy path (MySQL).""" + """ + Benchmark a full saga transaction using SQLAlchemy storage in legacy mode. + + Runs a complete saga (three-step OrderSaga) against SqlAlchemySagaStorageLegacy, which disables `create_run` so the storage exercises the legacy commit-per-call path. The benchmark executes the saga transaction in the provided event loop and database engine fixture. + """ loop, engine = saga_benchmark_loop_and_engine session_factory = async_sessionmaker( @@ -137,6 +158,11 @@ def test_benchmark_saga_sqlalchemy_legacy_full_transaction( context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: + """ + Execute the configured saga transaction and iterate through all its steps to completion. + + This coroutine opens a transaction using the surrounding `saga_sqlalchemy`, `saga_container`, `context`, and `storage` variables and consumes the transaction iterator without performing additional actions. + """ async with saga_sqlalchemy.transaction( context=context, container=saga_container, @@ -154,7 +180,11 @@ def test_benchmark_saga_sqlalchemy_legacy_single_step( saga_container: SagaContainer, saga_benchmark_loop_and_engine, ): - """Benchmark saga with single step, legacy path (SQLAlchemy storage).""" + """ + Benchmark executing a single-step Saga using legacy SQLAlchemy storage (commit-per-call path). + + Constructs a SingleStepSaga with ReserveInventoryStep, creates a SqlAlchemySagaStorageLegacy backed by the provided engine/session factory, and measures running a full saga transaction (iterating the transaction to completion) using the provided event loop via the benchmark fixture. + """ loop, engine = saga_benchmark_loop_and_engine class SingleStepSaga(Saga[OrderContext]): @@ -172,6 +202,11 @@ class SingleStepSaga(Saga[OrderContext]): context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: + """ + Execute the saga transaction lifecycle by entering the saga's transaction context and iterating its steps to completion. + + This function opens the saga transaction using the surrounding `saga`, `saga_container`, `storage`, and `context`, then consumes the transaction iterator to drive all saga steps to completion. + """ async with saga.transaction( context=context, container=saga_container, @@ -180,4 +215,4 @@ async def run_transaction() -> None: async for _ in transaction: pass - benchmark(lambda: loop.run_until_complete(run_transaction())) + benchmark(lambda: loop.run_until_complete(run_transaction())) \ No newline at end of file diff --git a/tests/benchmarks/default/test_benchmark_saga_memory.py b/tests/benchmarks/default/test_benchmark_saga_memory.py index c5ab5f2..5d207ba 100644 --- a/tests/benchmarks/default/test_benchmark_saga_memory.py +++ b/tests/benchmarks/default/test_benchmark_saga_memory.py @@ -141,6 +141,12 @@ def saga_container() -> SagaContainer: @pytest.fixture def memory_storage() -> MemorySagaStorage: + """ + Create a fresh in-memory saga storage instance for tests. + + Returns: + MemorySagaStorage: A new MemorySagaStorage used to persist saga state in memory. + """ return MemorySagaStorage() @@ -150,11 +156,26 @@ class MemorySagaStorageLegacy(MemorySagaStorage): def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: + """ + Disable creation of scoped storage runs for the legacy storage variant used in benchmarks. + + Returns: + contextlib.AbstractAsyncContextManager[SagaStorageRun]: An async context manager yielding a `SagaStorageRun` (disabled in this legacy implementation). + + Raises: + NotImplementedError: Always raised with the message "Legacy storage: create_run disabled for benchmark". + """ raise NotImplementedError("Legacy storage: create_run disabled for benchmark") @pytest.fixture def memory_storage_legacy() -> MemorySagaStorageLegacy: + """ + Create a MemorySagaStorageLegacy instance for legacy-path benchmarks. + + Returns: + MemorySagaStorageLegacy: A storage instance where `create_run()` is disabled and will raise NotImplementedError if called. + """ return MemorySagaStorageLegacy() @@ -163,6 +184,12 @@ def saga_with_memory_storage( saga_container: SagaContainer, memory_storage: MemorySagaStorage, ) -> Saga[OrderContext]: + """ + Create an OrderSaga preconfigured with inventory reservation, payment processing, and shipping steps. + + Returns: + Saga[OrderContext]: An instance configured with ReserveInventoryStep, ProcessPaymentStep, and ShipOrderStep. + """ class OrderSaga(Saga[OrderContext]): steps = [ReserveInventoryStep, ProcessPaymentStep, ShipOrderStep] @@ -179,6 +206,11 @@ def test_benchmark_saga_memory_run_full_transaction( """Benchmark full saga transaction with memory storage, scoped run (3 steps).""" async def run() -> None: + """ + Execute a full three-step OrderSaga transaction using the memory storage scoped-run path. + + Creates an OrderContext and runs the saga transaction to completion with the provided saga container and memory storage. + """ context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async with saga_with_memory_storage.transaction( context=context, @@ -228,6 +260,14 @@ def test_benchmark_saga_memory_run_ten_transactions( """Benchmark 10 saga transactions in sequence, scoped run (memory storage).""" async def run() -> None: + """ + Run ten sequential saga transactions, each using a new MemorySagaStorage and an OrderContext. + + Each iteration (i from 0 to 9) creates: + - a fresh MemorySagaStorage, + - an OrderContext with order_id "ord_i", user_id "user_i", and amount 100.0 + i, + then opens a transaction from `saga_with_memory_storage` with `saga_container` and the storage and iterates the transaction to completion. + """ for i in range(10): storage = MemorySagaStorage() context = OrderContext( @@ -259,6 +299,11 @@ def test_benchmark_saga_memory_legacy_full_transaction( """Benchmark full saga transaction with memory storage, legacy path (3 steps).""" async def run() -> None: + """ + Execute a full OrderSaga transaction using the legacy memory storage path. + + Builds an OrderContext (order_id "ord_1", user_id "user_1", amount 100.0) and runs the saga_with_memory_storage transaction with saga_container and memory_storage_legacy, iterating the transaction to completion. + """ context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async with saga_with_memory_storage.transaction( context=context, @@ -286,6 +331,11 @@ class SingleStepSaga(Saga[OrderContext]): saga = SingleStepSaga() async def run() -> None: + """ + Runs a full OrderSaga transaction using the legacy memory storage path. + + This coroutine executes the saga with an OrderContext and the MemorySagaStorageLegacy instance so the saga proceeds through all steps while exercising the legacy storage behavior (create_run disabled, commit-per-call path). + """ context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async with saga.transaction( context=context, @@ -307,6 +357,11 @@ def test_benchmark_saga_memory_legacy_ten_transactions( """Benchmark 10 saga transactions in sequence, legacy path (memory storage).""" async def run() -> None: + """ + Run ten sequential saga transactions using the legacy memory storage path. + + For each iteration this function creates a new MemorySagaStorageLegacy, constructs an OrderContext with a unique order_id and user_id and increasing amount, opens a saga transaction using the shared saga_with_memory_storage and saga_container, and iterates the transaction to completion. + """ for i in range(10): storage = MemorySagaStorageLegacy() context = OrderContext( @@ -322,4 +377,4 @@ async def run() -> None: async for _ in transaction: pass - benchmark(lambda: asyncio.run(run())) + benchmark(lambda: asyncio.run(run())) \ No newline at end of file diff --git a/tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py b/tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py index af151f5..b750a7f 100644 --- a/tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py +++ b/tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py @@ -28,11 +28,26 @@ class SqlAlchemySagaStorageLegacy(SqlAlchemySagaStorage): def create_run( self, ) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]: + """ + Disable scoped run creation for legacy storage used in benchmarks. + + This storage intentionally does not provide a scoped `create_run` context manager. + Calling this method raises a NotImplementedError to indicate the legacy path is in use. + + Raises: + NotImplementedError: Always raised to indicate scoped run creation is disabled for legacy storage. + """ raise NotImplementedError("Legacy storage: create_run disabled for benchmark") @pytest.fixture def saga_container() -> SagaContainer: + """ + Create a SagaContainer pre-registered with the standard order saga steps. + + Returns: + SagaContainer: Container with ReserveInventoryStep, ProcessPaymentStep, and ShipOrderStep registered. + """ container = SagaContainer() container.register(ReserveInventoryStep, ReserveInventoryStep()) container.register(ProcessPaymentStep, ProcessPaymentStep()) @@ -103,6 +118,11 @@ class SingleStepSaga(Saga[OrderContext]): context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: + """ + Run the saga transaction to completion by iterating over its yielded steps using the configured context, container, and storage. + + This function is used by benchmarks to execute a full saga flow without performing additional work per step. + """ async with saga.transaction( context=context, container=saga_container, @@ -137,6 +157,11 @@ def test_benchmark_saga_sqlalchemy_legacy_full_transaction( context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: + """ + Execute the saga transaction and iterate through all produced steps without performing any operations. + + Opens the saga's transaction context with the configured container and storage, then consumes every yielded step (no-op per step). Intended for benchmarking the transaction iteration path. + """ async with saga_sqlalchemy.transaction( context=context, container=saga_container, @@ -172,6 +197,11 @@ class SingleStepSaga(Saga[OrderContext]): context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: + """ + Run the saga transaction to completion by iterating over its yielded steps using the configured context, container, and storage. + + This function is used by benchmarks to execute a full saga flow without performing additional work per step. + """ async with saga.transaction( context=context, container=saga_container, @@ -180,4 +210,4 @@ async def run_transaction() -> None: async for _ in transaction: pass - benchmark(lambda: loop.run_until_complete(run_transaction())) + benchmark(lambda: loop.run_until_complete(run_transaction())) \ No newline at end of file diff --git a/tests/unit/test_saga/test_saga_storage_run.py b/tests/unit/test_saga/test_saga_storage_run.py index c61c32d..2e0c575 100644 --- a/tests/unit/test_saga/test_saga_storage_run.py +++ b/tests/unit/test_saga/test_saga_storage_run.py @@ -22,9 +22,20 @@ class StorageWithoutCreateRun(ISagaStorage): """Storage that does not implement create_run (legacy path).""" def __init__(self) -> None: + """ + Create a storage wrapper that delegates all saga operations to an internal in-memory storage while intentionally not providing `create_run`. + """ self._inner = MemorySagaStorage() async def create_saga(self, saga_id: uuid.UUID, name: str, context: dict) -> None: + """ + Create a new saga record with the given identifier, name, and initial context. + + Parameters: + saga_id (uuid.UUID): Unique identifier for the saga. + name (str): Human-readable name of the saga. + context (dict): Initial context payload for the saga. + """ await self._inner.create_saga(saga_id, name, context) async def update_context( @@ -33,9 +44,24 @@ async def update_context( context: dict, current_version: int | None = None, ) -> None: + """ + Update the stored context for a saga, optionally validating the expected current version. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose context will be updated. + context (dict): New context data to persist for the saga. + current_version (int | None): If provided, the update will only proceed when the stored version equals this value; pass None to skip version validation. + """ await self._inner.update_context(saga_id, context, current_version) async def update_status(self, saga_id: uuid.UUID, status: SagaStatus) -> None: + """ + Update the stored status of a saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to update. + status (SagaStatus): New status to set for the saga. + """ await self._inner.update_status(saga_id, status) async def log_step( @@ -46,6 +72,16 @@ async def log_step( status: SagaStepStatus, details: str | None = None, ) -> None: + """ + Record the execution or compensation outcome of a saga step. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga. + step_name (str): Name of the step being logged. + action (Literal["act", "compensate"]): Whether this log entry is for the step's normal action ("act") or its compensation ("compensate"). + status (SagaStepStatus): Resulting status of the step. + details (str | None): Optional human-readable details or metadata about the step event. + """ await self._inner.log_step(saga_id, step_name, action, status, details) async def load_saga_state( @@ -54,12 +90,31 @@ async def load_saga_state( *, read_for_update: bool = False, ) -> tuple[SagaStatus, dict, int]: + """ + Load the current state for a saga from the underlying storage. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga to load. + read_for_update (bool): If True, load the state with intent to update (may acquire locks or use a read-for-update strategy). + + Returns: + tuple[SagaStatus, dict, int]: A tuple containing the saga's status, its context dictionary, and the current version number. + """ return await self._inner.load_saga_state( saga_id, read_for_update=read_for_update, ) async def get_step_history(self, saga_id: uuid.UUID) -> list: + """ + Return the step execution history for the given saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose history to retrieve. + + Returns: + list: Step history records in chronological order. Each record describes the step name, action ("act" or "compensate"), step status, timestamp, and any optional details. + """ return await self._inner.get_step_history(saga_id) async def get_sagas_for_recovery( @@ -69,6 +124,18 @@ async def get_sagas_for_recovery( stale_after_seconds: int | None = None, saga_name: str | None = None, ) -> list[uuid.UUID]: + """ + Selects saga IDs that are eligible for recovery. + + Parameters: + limit (int): Maximum number of saga IDs to return. + max_recovery_attempts (int): Only include sagas with fewer than this many recovery attempts. + stale_after_seconds (int | None): If provided, only include sagas last updated more than this many seconds ago; if None, do not filter by staleness. + saga_name (str | None): If provided, restrict results to sagas with this name. + + Returns: + list[uuid.UUID]: Saga UUIDs that match the recovery criteria, up to `limit`. + """ return await self._inner.get_sagas_for_recovery( limit, max_recovery_attempts=max_recovery_attempts, @@ -81,9 +148,23 @@ async def increment_recovery_attempts( saga_id: uuid.UUID, new_status: SagaStatus | None = None, ) -> None: + """ + Increment the recovery-attempts counter for a saga and optionally update its status. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose recovery attempts should be incremented. + new_status (SagaStatus | None): If provided, update the saga's status to this value after incrementing attempts; otherwise leave status unchanged. + """ await self._inner.increment_recovery_attempts(saga_id, new_status) async def set_recovery_attempts(self, saga_id: uuid.UUID, attempts: int) -> None: + """ + Set the number of recovery attempts recorded for a saga. + + Parameters: + saga_id (uuid.UUID): Identifier of the saga whose recovery attempts will be set. + attempts (int): Number of recovery attempts to record; should be zero or a positive integer. + """ await self._inner.set_recovery_attempts(saga_id, attempts) @@ -208,4 +289,4 @@ async def test_storage_create_run_raises_not_implemented_by_default() -> None: """Default create_run() on a minimal storage raises NotImplementedError.""" storage = StorageWithoutCreateRun() with pytest.raises(NotImplementedError, match="does not support create_run"): - storage.create_run() + storage.create_run() \ No newline at end of file