Skip to content

Comments

Reduce saga storage overhead#61

Merged
vadikko2 merged 16 commits intomasterfrom
reduce-saga-storage-overhead
Feb 20, 2026
Merged

Reduce saga storage overhead#61
vadikko2 merged 16 commits intomasterfrom
reduce-saga-storage-overhead

Conversation

@vadikko2
Copy link
Collaborator

@vadikko2 vadikko2 commented Feb 10, 2026

Closes issue #60

Summary

The reduce-saga-storage-overhead branch reduces saga storage load by introducing checkpoint commits: one session per saga run with explicit commits only at key points (after each step, after each compensation step, etc.) instead of committing after every storage call. This cuts the number of commits, shortens lock hold time, and lowers the risk of deadlocks when using SQLAlchemy.


Main changes

1. Checkpoint commits and SagaStorageRun protocol

  • New protocol SagaStorageRun (src/cqrs/saga/storage/protocol.py): represents a scoped “session” for a single saga. Its methods only perform operations within that session and do not commit; the caller is responsible for calling commit() at the desired checkpoints.
  • New method ISagaStorage.create_run(): returns a context manager that yields a SagaStorageRun. If a storage does not support create_run() (e.g. a custom implementation), it may raise NotImplementedError, and execution falls back to the previous behaviour (no single session, no checkpoint commits).
  • SagaTransaction behaviour: when create_run() is available, the saga runs inside one session and commits only at checkpoints: after creating the saga and setting status to RUNNING, after each completed step (act), after each compensated step (compensate), and at the end of compensation or when marking COMPLETED/failed. Storages that do not implement create_run() continue to work as before.

2. Storage implementations

  • Memory (MemorySagaStorage): implements create_run(), returning _MemorySagaStorageRun; commit/rollback are no-ops, but the protocol is aligned with SQLAlchemy.
  • SQLAlchemy (SqlAlchemySagaStorage): implements create_run(), returning _SqlAlchemySagaStorageRun backed by a single AsyncSession per run; all mutations go through that session and are committed only when the saga calls run.commit().

3. Deadlock mitigation (Fix deadlocks)

  • load_saga_state(..., read_for_update=True): when loading state for recovery or exclusive update, the implementation can lock the row (e.g. SELECT ... FOR UPDATE in SQLAlchemy) so the same saga is not updated concurrently. Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.

4. Compensation

  • SagaCompensator now accepts an optional on_after_compensate_step callback, invoked after each successfully compensated step. When using a run, the saga passes run.commit so each compensation step is persisted at a checkpoint.

5. Documentation and types

  • Docstrings were added or updated for the storage protocol, SagaStorageRun, memory and SQLAlchemy storage, SagaTransaction, execution and recovery managers, and compensator. The “Strict Backward Recovery” behaviour and use of checkpoint commits when create_run() is available are described in the code.

Tests and infrastructure

Unit tests

  • New file tests/unit/test_saga/test_saga_storage_run.py: tests the path with create_run() and checkpoint commits (memory storage), and the fallback when storage does not implement create_run().

Integration tests and databases

  • PostgreSQL: integration tests now run against PostgreSQL (port 5433) in addition to MySQL.
  • Test split by database: test_saga_storage_sqlalchemy_postgres.py / test_saga_storage_sqlalchemy_mysql.py, and test_saga_mediator_sqlalchemy_postgres.py / test_saga_mediator_sqlalchemy_mysql.py; shared conftest/fixtures were extended for PostgreSQL (DATABASE_DSN_POSTGRESQL).

CI

  • tests.yml: start and wait for PostgreSQL; set DATABASE_DSN_POSTGRESQL; run unit and integration tests with both MySQL and PostgreSQL.
  • New workflow codspeed.yml: run benchmarks via CodSpeed (MySQL, PostgreSQL, Redis; pytest -c ./tests/pytest-config.ini tests/benchmarks/ --codspeed).
  • pytest-config.ini: added DATABASE_DSN_POSTGRESQL to the test env.

Docker and dependencies

  • docker-compose-test.yml: new service postgres_tests (PostgreSQL 15.4, port 5433).
  • pyproject.toml: version 4.9.0; dev dependency pytest-codspeed==4.2.0 added.

Benchmarks

  • Benchmarks under tests/benchmarks/ were updated and extended (including conftest and dataclasses/default variants for memory and SQLAlchemy) to measure the impact of the new storage behaviour.

Examples and compatibility

  • Saga examples (saga.py, saga_recovery.py, saga_sqlalchemy_storage.py, saga_fallback.py, saga_recovery_scheduler.py, etc.) were updated to the current storage API and, where applicable, to use the checkpoint path.
  • Backward compatibility: code that uses ISagaStorage without create_run() continues to work; the new path is used only when the storage implements create_run() (Memory and SQLAlchemy do so out of the box).

Miscellaneous

  • README.md was updated for structure and usage.
  • Post-review fixes: ruff format, pre-commit, and “banchmarks” → benchmarks rename.

Summary of benefits

  • Fewer database commits per saga run thanks to the checkpoint model.
  • Shorter lock hold times and lower deadlock risk via a single session and read_for_update on recovery.
  • Backward compatibility preserved; custom storages can optionally implement create_run() following the Memory/SQLAlchemy pattern.

Summary by CodeRabbit

  • New Features

    • Scoped per-saga runs with checkpointed commits for better performance.
    • Optional post-compensation callback hook.
    • Broadened storage/run-aware execution with MySQL and PostgreSQL readiness.
  • Documentation

    • Major README overhaul focusing on event-driven CQRS, sagas, outbox, recovery and diagrams.
    • Examples expanded to show scoped-run, recovery, fallback and FastAPI/streaming scenarios.
  • Tests / Benchmarks

    • Added/updated tests and benchmarks for scoped-run and legacy commit-per-call paths.
  • Chores

    • CI/workflow and compose updated for PostgreSQL; project version bumped to 4.9.0.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 10, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a scoped-run abstraction for saga storage (SagaStorageRun and ISagaStorage.create_run), implements run-backed Memory and SQLAlchemy runs with commit/rollback, alters saga execution and compensation to use run-scoped flows with legacy fallback, extends tests/benchmarks for run vs legacy modes, and updates docs, examples, and CI for Postgres/MySQL.

Changes

Cohort / File(s) Summary
Storage protocol & exports
src/cqrs/saga/storage/protocol.py, src/cqrs/saga/storage/__init__.py
Introduce SagaStorageRun protocol and add ISagaStorage.create_run() returning an async context manager; export SagaStorageRun.
Memory storage
src/cqrs/saga/storage/memory.py
Add _MemorySagaStorageRun and MemorySagaStorage.create_run() returning an async context manager that delegates operations; in-memory commit/rollback are no-ops.
SQLAlchemy storage
src/cqrs/saga/storage/sqlalchemy.py
Add _SqlAlchemySagaStorageRun and SqlAlchemySagaStorage.create_run() managing an AsyncSession with explicit commit/rollback and run-scoped methods.
Saga core & flow
src/cqrs/saga/saga.py
Make saga execution run-aware: attempt create_run() to build run-scoped managers/executors, commit at checkpoints, delegate compensation to SagaCompensator; fall back to legacy per-call path when create_run() is unsupported.
Compensator & managers
src/cqrs/saga/compensation.py, src/cqrs/saga/execution.py
Broaden storage types to accept `ISagaStorage
Benchmarks & fixtures
tests/benchmarks/..., tests/benchmarks/conftest.py
Add *Legacy storage adapters that disable create_run() to exercise legacy commit-per-call path; rename run-path benchmarks, add legacy counterparts, and update fixtures/docs.
Unit & integration tests
tests/unit/test_saga/test_saga_storage_run.py, tests/integration/..., tests/pytest-config.ini, tests/integration/fixtures.py
Add unit tests for create_run() and run vs legacy flows; split/add extensive integration tests for MySQL and Postgres with per-DB fixtures and DSN env vars; adapt existing integration tests to DB-specific fixtures.
Examples & docs
README.md, examples/*, examples/saga_sqlalchemy_storage.py
Document create_run() semantics (one session per saga, checkpoint commits), reorganize README, update example docstrings/logging and mention storage run support.
CI & compose
.github/workflows/*, docker-compose-test.yml, docker-compose-dev.yml
Add Postgres services and readiness wait steps in CI workflows; add postgres services to docker-compose files; update workflow env for DB DSNs; small pre-commit exclude tweak.
Version & dev deps
pyproject.toml
Bump project version 4.8.1 → 4.9.0 and add asyncpg>=0.29.0 to dev dependencies.
Minor formatting & cleanup
many src/*, tests/*, examples/*
Numerous non-functional formatting, docstring, and test assertion consolidations; minor logging/string formatting tweaks.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Saga as Saga
    participant Storage as ISagaStorage/\nSagaStorageRun
    participant StateMgr as StateManager
    participant Executor as StepExecutor
    participant Comp as Compensator

    Note over Saga,Storage: Run-backed path
    Client->>Saga: start saga
    Saga->>Storage: create_run()
    Storage-->>Saga: SagaStorageRun
    Saga->>Storage: create_saga(saga_id, name, context)
    Saga->>Storage: update_status(RUNNING)
    Saga->>Storage: commit()

    loop For each step
        Saga->>StateMgr: load_completed_step_names()
        Saga->>Executor: execute_step(step)
        Executor-->>Saga: step result
        Saga->>Storage: log_step(step, status)
        Saga->>Storage: update_context(context)
        Saga->>Storage: commit()
    end

    alt step fails
        Saga->>Storage: update_status(COMPENSATING)
        Saga->>Storage: commit()
        loop Reverse completed steps
            Saga->>Comp: compensate_steps([...])
            Comp->>Storage: log_step(step, "compensate", COMPLETED)
            Comp-->>Saga: compensation result
        end
        Saga->>Storage: update_status(FAILED)
        Saga->>Storage: commit()
    else all succeed
        Saga->>Storage: update_status(COMPLETED)
        Saga->>Storage: commit()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Possibly related PRs

Poem

🐰 I hop through runs and checkpoints bright,

One session per saga, stepping light.
Legacy paths still tumble and play,
Run-backed commits keep failures at bay.
A rabbit cheers — sagas safe today!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 62.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Reduce saga storage overhead' directly reflects the main objective of this PR, which introduces a scoped run mechanism (SagaStorageRun) to reduce commit overhead by enabling one session per saga with checkpoint commits instead of per-call commits.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch reduce-saga-storage-overhead

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@vadikko2 vadikko2 requested a review from DimaPlaz February 10, 2026 21:40
@codecov
Copy link

codecov bot commented Feb 10, 2026

Codecov Report

❌ Patch coverage is 94.73684% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.67%. Comparing base (ce4dbdf) to head (d77c49c).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
src/cqrs/saga/storage/sqlalchemy.py 90.00% 6 Missing ⚠️
src/cqrs/saga/compensation.py 70.00% 3 Missing ⚠️
src/cqrs/saga/saga.py 98.52% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master      #61      +/-   ##
==========================================
+ Coverage   87.26%   87.67%   +0.40%     
==========================================
  Files          70       70              
  Lines        2481     2636     +155     
==========================================
+ Hits         2165     2311     +146     
- Misses        316      325       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codspeed-hq
Copy link
Contributor

codspeed-hq bot commented Feb 10, 2026

Merging this PR will not alter performance

✅ 48 untouched benchmarks
🆕 22 new benchmarks
⏩ 10 skipped benchmarks1

Performance Changes

Benchmark BASE HEAD Efficiency
🆕 test_benchmark_saga_memory_legacy_full_transaction N/A 1.4 ms N/A
🆕 test_benchmark_saga_memory_run_single_step N/A 919.9 µs N/A
🆕 test_benchmark_saga_memory_run_full_transaction N/A 1.5 ms N/A
🆕 test_benchmark_saga_memory_legacy_single_step N/A 876 µs N/A
🆕 test_benchmark_saga_memory_legacy_ten_transactions N/A 9.2 ms N/A
🆕 test_benchmark_saga_memory_run_ten_transactions N/A 9.8 ms N/A
🆕 test_benchmark_saga_memory_legacy_ten_transactions N/A 9.1 ms N/A
🆕 test_benchmark_saga_memory_legacy_full_transaction N/A 1.4 ms N/A
🆕 test_benchmark_saga_memory_run_ten_transactions N/A 9.8 ms N/A
🆕 test_benchmark_saga_sqlalchemy_legacy_single_step N/A 10.8 ms N/A
🆕 test_benchmark_saga_memory_run_single_step N/A 935.1 µs N/A
🆕 test_benchmark_saga_memory_legacy_single_step N/A 856.7 µs N/A
🆕 test_benchmark_saga_sqlalchemy_legacy_full_transaction N/A 19 ms N/A
🆕 test_benchmark_saga_sqlalchemy_legacy_full_transaction[legacy] N/A 19 ms N/A
🆕 test_benchmark_saga_sqlalchemy_legacy_full_transaction[storage] N/A 13.5 ms N/A
🆕 test_benchmark_saga_memory_run_full_transaction N/A 1.5 ms N/A
🆕 test_benchmark_saga_sqlalchemy_run_full_transaction N/A 13.6 ms N/A
🆕 test_benchmark_saga_sqlalchemy_run_single_step N/A 8.4 ms N/A
🆕 test_benchmark_saga_sqlalchemy_run_single_step N/A 8.5 ms N/A
🆕 test_benchmark_saga_sqlalchemy_legacy_single_step[legacy] N/A 10.8 ms N/A
... ... ... ... ...

ℹ️ Only the first 20 benchmarks are displayed. Go to the app to view all benchmarks.


Comparing reduce-saga-storage-overhead (d77c49c) with master (ce4dbdf)

Open in CodSpeed

Footnotes

  1. 10 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
README.md (1)

86-97: ⚠️ Potential issue | 🟡 Minor

Fix the events vs _events mismatch in examples.

Both Command and Query handler snippets set self.events but the property returns self._events. This will confuse users and is inconsistent with the sample. Use one attribute name consistently.

✅ Suggested doc fix
 class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):

       def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
           self._meetings_api = meetings_api
-          self.events: list[Event] = []
+          self._events: list[Event] = []

       `@property`
       def events(self) -> typing.List[events.Event]:
           return self._events
 class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):

       def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
           self._meetings_api = meetings_api
-          self.events: list[Event] = []
+          self._events: list[Event] = []

       `@property`
       def events(self) -> typing.List[events.Event]:
           return self._events

Also applies to: 114-126

🤖 Fix all issues with AI agents
In `@README.md`:
- Around line 325-328: The snippet in init_events uses two inconsistent module
aliases (events_models vs event_models) which will confuse readers and break the
example; choose one alias and use it consistently for the NotificationEvent
generic types (e.g., change event_models to events_models or vice versa) so both
mapper.bind lines reference the same symbol set (affecting
NotificationEvent[events_models.ECSTMeetingRoomClosed] and
NotificationEvent[events_models.NotificationMeetingRoomClosed]), and update any
corresponding import alias shown elsewhere in the snippet to match (keep
MeetingRoomClosedNotificationHandler and UpdateMeetingRoomReadModelHandler names
unchanged).

In `@src/cqrs/saga/compensation.py`:
- Around line 104-105: The post-compensation callback
self._on_after_compensate_step should be executed outside the step's broad
try/except so its failures don't get swallowed or cause duplicate
COMPLETED/FAILED logs; move the await self._on_after_compensate_step() call out
of the step exception handling in compensate_step (or the surrounding method),
let exceptions from _on_after_compensate_step (e.g., run.commit) propagate or
handle them explicitly (log once and abort) before marking the step
COMPLETED/FAILED, ensuring the callback is invoked only after a successful step
and its errors are treated separately.
🧹 Nitpick comments (3)
examples/saga_recovery.py (1)

293-295: Consider using a single f-string.

Adjacent f-string concatenation f"..." f"..." works but a single f-string would be cleaner:

f"  ✓ Created shipment {shipment_id} for order {order_id} (tracking: {tracking_number})"

This same pattern appears on lines 608, 679, 788, and 853.

src/cqrs/saga/saga.py (1)

202-208: Avoid reaching into _compensator private fields.
Using _compensator._retry_* couples the new run path to internal state. Prefer storing the retry config on SagaTransaction and reusing it here.

♻️ Suggested refactor (store retry config on the transaction)
@@
         self._storage = storage
         self._completed_steps: list[SagaStepHandler[ContextT, typing.Any]] = []
         self._error: BaseException | None = None
         self._compensated: bool = False
+        self._comp_retry_count = compensation_retry_count
+        self._comp_retry_delay = compensation_retry_delay
+        self._comp_retry_backoff = compensation_retry_backoff
@@
         self._compensator: SagaCompensator[ContextT] = SagaCompensator[ContextT](
             self._saga_id,
             context,
             storage,
-            compensation_retry_count,
-            compensation_retry_delay,
-            compensation_retry_backoff,
+            self._comp_retry_count,
+            self._comp_retry_delay,
+            self._comp_retry_backoff,
         )
@@
             compensator = SagaCompensator(
                 self._saga_id,
                 self._context,
                 run,
-                self._compensator._retry_count,
-                self._compensator._retry_delay,
-                self._compensator._retry_backoff,
+                self._comp_retry_count,
+                self._comp_retry_delay,
+                self._comp_retry_backoff,
                 on_after_compensate_step=run.commit,
             )
tests/benchmarks/dataclasses/test_benchmark_saga_memory.py (1)

225-249: Unused memory_storage fixture parameter.

The memory_storage fixture is injected but never used—a fresh MemorySagaStorage() is created inside the loop (line 235). Consider removing the unused parameter for clarity.

Proposed fix
 `@pytest.mark.benchmark`
 def test_benchmark_saga_memory_run_ten_transactions(
     benchmark,
     saga_with_memory_storage: Saga[OrderContext],
     saga_container: SagaContainer,
-    memory_storage: MemorySagaStorage,
 ):

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 18, 2026

Note

Docstrings generation - SUCCESS
Generated docstrings for this pull request at #62

coderabbitai bot added a commit that referenced this pull request Feb 18, 2026
Docstrings generation was requested by @vadikko2.

* #61 (comment)

The following files were modified:

* `examples/saga.py`
* `examples/saga_fallback.py`
* `examples/saga_recovery.py`
* `examples/saga_recovery_scheduler.py`
* `examples/saga_sqlalchemy_storage.py`
* `src/cqrs/saga/compensation.py`
* `src/cqrs/saga/execution.py`
* `src/cqrs/saga/saga.py`
* `src/cqrs/saga/storage/memory.py`
* `src/cqrs/saga/storage/protocol.py`
* `src/cqrs/saga/storage/sqlalchemy.py`
* `tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`
* `tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`
* `tests/benchmarks/default/test_benchmark_saga_memory.py`
* `tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`
* `tests/unit/test_saga/test_saga_storage_run.py`
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
examples/saga_sqlalchemy_storage.py (1)

42-44: ⚠️ Potential issue | 🟡 Minor

Comment contradicts the actual default URL.

Line 43 says "Using SQLite for this example" but the default DB_URL on line 44 is a MySQL connection string (mysql+asyncmy://...). The comment appears to be a leftover from an earlier version.

Suggested fix
 # Database Configuration
-# Using SQLite for this example, but can be swapped for PostgreSQL/MySQL
+# Using MySQL for this example, but can be swapped for PostgreSQL/SQLite
 DB_URL = os.getenv("DATABASE_URL", "mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/saga_sqlalchemy_storage.py` around lines 42 - 44, The comment above
DB_URL is incorrect—update the text or the default to match each other: either
change the comment to indicate MySQL (or asyncmy) as the default connection, or
set DB_URL's default to a SQLite URI (e.g., "sqlite+aiosqlite:///./test.db");
modify the comment near the DB_URL declaration (symbol: DB_URL) to accurately
describe the default or change the DB_URL value to the intended SQLite URL so
the comment and default are consistent.
🧹 Nitpick comments (8)
examples/saga_sqlalchemy_storage.py (1)

142-149: Docstring placed after inline comments — may confuse documentation tools.

The docstring on lines 145–149 is placed after the # comments on lines 143–144. While Python still recognizes it as the function docstring (comments aren't statements), many documentation generators (Sphinx, IDE tooltips) expect the docstring to immediately follow the def line.

Move docstring before comments
 async def main() -> None:
+    """
+    Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage.
+    
+    Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine.
+    """
     # 1. Create SQLAlchemy Engine with Connection Pool
     # SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite)
-    """
-    Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage.
-    
-    Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine.
-    """
     engine = create_async_engine(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/saga_sqlalchemy_storage.py` around lines 142 - 149, The
function-level docstring for async def main() is placed after inline comments,
which can confuse documentation tools; move the triple-quoted docstring so it
immediately follows the def main() line (i.e., place the """Run a
demonstration...""" string directly after the function signature), and then keep
the existing inline comments (# 1. Create SQLAlchemy Engine...) after the
docstring; ensure the string is a proper docstring (triple quotes) and retains
the same content.
src/cqrs/saga/storage/sqlalchemy.py (2)

191-218: Duplicate .values(...) in both branches of update_context.

The if current_version is not None and else branches set identical .values(context=context, version=SagaExecutionModel.version + 1). The only difference is the extra .where(...) clause. This can be simplified.

♻️ Simplify by applying the common values once
         stmt = sqlalchemy.update(SagaExecutionModel).where(
             SagaExecutionModel.id == saga_id,
         )
         if current_version is not None:
             stmt = stmt.where(SagaExecutionModel.version == current_version)
-            stmt = stmt.values(
-                context=context,
-                version=SagaExecutionModel.version + 1,
-            )
-        else:
-            stmt = stmt.values(
-                context=context,
-                version=SagaExecutionModel.version + 1,
-            )
+        stmt = stmt.values(
+            context=context,
+            version=SagaExecutionModel.version + 1,
+        )

Note: The same duplication exists in SqlAlchemySagaStorage.update_context (lines 439–450).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/storage/sqlalchemy.py` around lines 191 - 218, The
update_context implementation on SqlAlchemySagaStorage duplicates the
.values(context=context, version=SagaExecutionModel.version + 1) call in both
branches; change the logic so the conditional only adds the extra
.where(SagaExecutionModel.version == current_version) when current_version is
not None and apply .values(...) once after that branch, keeping the initial stmt
= sqlalchemy.update(SagaExecutionModel).where(SagaExecutionModel.id == saga_id)
and then executing stmt via self._session.execute as before; also apply the same
refactor to the other occurrence in SqlAlchemySagaStorage.update_context to
remove the duplicated .values call.

379-389: create_run does not commit on normal exit — silent data loss if caller forgets commit().

By design, the protocol says the caller must call commit(). However, the context manager silently discards uncommitted work on clean exit (no exception, no rollback). For a database-backed storage, this can be surprising.

Consider logging a warning if the session has uncommitted changes on exit, or documenting this prominently.

tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py (1)

25-41: SqlAlchemySagaStorageLegacy is duplicated across benchmark modules.

This identical class appears in both tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py and tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py (and a similar pattern in the memory benchmark). Consider extracting it to a shared conftest.py or test utility module.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 25 -
41, Extract the duplicated SqlAlchemySagaStorageLegacy class into a single
shared test helper (e.g., a test utilities module or conftest) and import it
from both benchmark test modules instead of redefining it; specifically, move
the class definition for SqlAlchemySagaStorageLegacy into the shared test
helper, update the benchmark test modules to import SqlAlchemySagaStorageLegacy,
and remove the duplicate class definitions (also apply the same extraction for
the analogous memory-benchmark duplicate).
src/cqrs/saga/saga.py (2)

191-223: Run-scoped components duplicate __init__ wiring — consider extracting a factory.

When run is not None, lines 192–217 re-create SagaStateManager, SagaRecoveryManager, SagaStepExecutor, FallbackStepExecutor, and SagaCompensator with the run as the storage argument. The else branch (lines 218–223) just aliases the __init__-created instances. This duplication will grow if more components are added.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/saga.py` around lines 191 - 223, The block that re-creates
SagaStateManager, SagaRecoveryManager, SagaStepExecutor, FallbackStepExecutor,
and SagaCompensator when run is not None duplicates __init__ wiring; extract
that creation into a small factory/helper method (e.g.,
_build_run_scoped_components(run)) that returns (state_manager,
recovery_manager, step_executor, fallback_executor, compensator), ensure the
SagaCompensator is constructed with the same retry args and
on_after_compensate_step=run.commit, and replace the duplicated block with a
single call to that factory while preserving the else branch aliasing to
self._state_manager, self._recovery_manager, self._step_executor,
self._fallback_executor, and self._compensator.

162-173: Fallback from create_run() via NotImplementedError — verify this is the intended contract.

The code catches NotImplementedError synchronously from self._storage.create_run(). This works because the base ISagaStorage.create_run() raises NotImplementedError directly (not inside an async context). If a storage implementation were to wrap create_run() as an async method or defer the error to __aenter__, this pattern would break.

The current implementations (Memory, SQLAlchemy) are correct, but documenting this synchronous-raise contract in the protocol would help future implementers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/saga.py` around lines 162 - 173, The current fallback assumes
self._storage.create_run() raises NotImplementedError synchronously; update
saga.py to also handle the case where create_run() returns an async context
manager that only raises on __aenter__. Change the block around
create_run()/async with so you first call run_cm = self._storage.create_run(),
then if run_cm is None proceed as before, else wrap the async with run_cm as run
in a try/except that catches NotImplementedError (and falls back to calling
self._execute(None)), so both synchronous raises and raises from entering the
context are handled; alternatively, add a clear protocol docstring on
ISagaStorage.create_run to require synchronous NotImplementedError, but prefer
the try/except-around-async-with approach to be robust.
tests/benchmarks/dataclasses/test_benchmark_saga_memory.py (2)

259-289: Unused memory_storage fixture parameter inflates benchmark overhead.

test_benchmark_saga_memory_run_ten_transactions requests memory_storage: MemorySagaStorage but never reads it — run() creates a fresh MemorySagaStorage() per iteration. The legacy counterpart test_benchmark_saga_memory_legacy_ten_transactions (lines 354-383) correctly omits its analogous fixture for exactly this reason. Align both by removing the unused parameter.

♻️ Proposed fix
 `@pytest.mark.benchmark`
 def test_benchmark_saga_memory_run_ten_transactions(
     benchmark,
     saga_with_memory_storage: Saga[OrderContext],
     saga_container: SagaContainer,
-    memory_storage: MemorySagaStorage,
 ):
     """Benchmark 10 saga transactions in sequence, scoped run (memory storage)."""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py` around lines 259
- 289, The test function test_benchmark_saga_memory_run_ten_transactions
currently declares an unused fixture parameter memory_storage: MemorySagaStorage
which inflates benchmark overhead; remove the memory_storage parameter from the
test signature so the function becomes def
test_benchmark_saga_memory_run_ten_transactions(benchmark,
saga_with_memory_storage: Saga[OrderContext], saga_container: SagaContainer):
and keep the internal creation of MemorySagaStorage() inside run() as-is; ensure
no other references to memory_storage remain in this function or its docstring.

156-168: MemorySagaStorageLegacy is duplicated across benchmark modules.

tests/benchmarks/default/test_benchmark_saga_memory.py (lines 153–172) defines an identical MemorySagaStorageLegacy class. Extract it to tests/benchmarks/conftest.py so both test files import from one place instead of duplicating the class.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py` around lines 156
- 168, Extract the duplicated MemorySagaStorageLegacy class into a shared
conftest.py and update both benchmark test modules to import it from that single
location: create MemorySagaStorageLegacy in conftest.py (preserving the
docstring and the create_run raising NotImplementedError message) and remove the
duplicate class definitions from the two test files, then add an import or rely
on pytest autoloading so tests reference the single MemorySagaStorageLegacy
symbol instead of having separate copies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cqrs/saga/saga.py`:
- Around line 209-217: The code currently reaches into SagaCompensator's private
fields (self._compensator._retry_count, _retry_delay, _retry_backoff) when
creating a run-scoped compensator in _execute; instead, capture the retry
parameters passed into SagaTransaction.__init__ by assigning them to instance
attributes (e.g., self._retry_count, self._retry_delay, self._retry_backoff)
during construction and then use those attributes when creating SagaCompensator
instances (both the original and the run-scoped one in _execute) so you no
longer reference self._compensator's private internals.
- Around line 347-359: The FAILED step log written via state_manager.log_step
inside the except block can be lost if compensator.compensate_steps raises
because the write is still staged in the run session; modify the except block so
that after calling state_manager.log_step(...) you ensure the write is persisted
by calling run.commit() (if run is not None) before invoking
compensator.compensate_steps(self._completed_steps), then proceed with setting
self._error/self._compensated and running compensation; keep the existing
rollback/raise behavior for subsequent failures.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`:
- Around line 182-202: The fixture saga_with_memory_storage declares unused
pytest dependencies saga_container (SagaContainer) and memory_storage
(MemorySagaStorage); remove these parameters from the fixture signature so
pytest won't instantiate them unnecessarily, and update the docstring to reflect
no external fixture dependencies; keep the inner OrderSaga class
(Saga[OrderContext]) and its steps list [ReserveInventoryStep,
ProcessPaymentStep, ShipOrderStep] unchanged and simply return OrderSaga().

---

Outside diff comments:
In `@examples/saga_sqlalchemy_storage.py`:
- Around line 42-44: The comment above DB_URL is incorrect—update the text or
the default to match each other: either change the comment to indicate MySQL (or
asyncmy) as the default connection, or set DB_URL's default to a SQLite URI
(e.g., "sqlite+aiosqlite:///./test.db"); modify the comment near the DB_URL
declaration (symbol: DB_URL) to accurately describe the default or change the
DB_URL value to the intended SQLite URL so the comment and default are
consistent.

---

Duplicate comments:
In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`:
- Around line 25-37: The SqlAlchemySagaStorageLegacy class is duplicated; remove
the redundant definition and ensure both benchmark variants reference a single
shared implementation (SqlAlchemySagaStorageLegacy) - e.g., keep the unique
class (SqlAlchemySagaStorageLegacy) and delete the duplicate, or extract it to a
common location used by both the `default` and other benchmark variants so only
one NotImplementedError-raising create_run implementation exists.

---

Nitpick comments:
In `@examples/saga_sqlalchemy_storage.py`:
- Around line 142-149: The function-level docstring for async def main() is
placed after inline comments, which can confuse documentation tools; move the
triple-quoted docstring so it immediately follows the def main() line (i.e.,
place the """Run a demonstration...""" string directly after the function
signature), and then keep the existing inline comments (# 1. Create SQLAlchemy
Engine...) after the docstring; ensure the string is a proper docstring (triple
quotes) and retains the same content.

In `@src/cqrs/saga/saga.py`:
- Around line 191-223: The block that re-creates SagaStateManager,
SagaRecoveryManager, SagaStepExecutor, FallbackStepExecutor, and SagaCompensator
when run is not None duplicates __init__ wiring; extract that creation into a
small factory/helper method (e.g., _build_run_scoped_components(run)) that
returns (state_manager, recovery_manager, step_executor, fallback_executor,
compensator), ensure the SagaCompensator is constructed with the same retry args
and on_after_compensate_step=run.commit, and replace the duplicated block with a
single call to that factory while preserving the else branch aliasing to
self._state_manager, self._recovery_manager, self._step_executor,
self._fallback_executor, and self._compensator.
- Around line 162-173: The current fallback assumes self._storage.create_run()
raises NotImplementedError synchronously; update saga.py to also handle the case
where create_run() returns an async context manager that only raises on
__aenter__. Change the block around create_run()/async with so you first call
run_cm = self._storage.create_run(), then if run_cm is None proceed as before,
else wrap the async with run_cm as run in a try/except that catches
NotImplementedError (and falls back to calling self._execute(None)), so both
synchronous raises and raises from entering the context are handled;
alternatively, add a clear protocol docstring on ISagaStorage.create_run to
require synchronous NotImplementedError, but prefer the
try/except-around-async-with approach to be robust.

In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 191-218: The update_context implementation on
SqlAlchemySagaStorage duplicates the .values(context=context,
version=SagaExecutionModel.version + 1) call in both branches; change the logic
so the conditional only adds the extra .where(SagaExecutionModel.version ==
current_version) when current_version is not None and apply .values(...) once
after that branch, keeping the initial stmt =
sqlalchemy.update(SagaExecutionModel).where(SagaExecutionModel.id == saga_id)
and then executing stmt via self._session.execute as before; also apply the same
refactor to the other occurrence in SqlAlchemySagaStorage.update_context to
remove the duplicated .values call.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`:
- Around line 259-289: The test function
test_benchmark_saga_memory_run_ten_transactions currently declares an unused
fixture parameter memory_storage: MemorySagaStorage which inflates benchmark
overhead; remove the memory_storage parameter from the test signature so the
function becomes def test_benchmark_saga_memory_run_ten_transactions(benchmark,
saga_with_memory_storage: Saga[OrderContext], saga_container: SagaContainer):
and keep the internal creation of MemorySagaStorage() inside run() as-is; ensure
no other references to memory_storage remain in this function or its docstring.
- Around line 156-168: Extract the duplicated MemorySagaStorageLegacy class into
a shared conftest.py and update both benchmark test modules to import it from
that single location: create MemorySagaStorageLegacy in conftest.py (preserving
the docstring and the create_run raising NotImplementedError message) and remove
the duplicate class definitions from the two test files, then add an import or
rely on pytest autoloading so tests reference the single MemorySagaStorageLegacy
symbol instead of having separate copies.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`:
- Around line 25-41: Extract the duplicated SqlAlchemySagaStorageLegacy class
into a single shared test helper (e.g., a test utilities module or conftest) and
import it from both benchmark test modules instead of redefining it;
specifically, move the class definition for SqlAlchemySagaStorageLegacy into the
shared test helper, update the benchmark test modules to import
SqlAlchemySagaStorageLegacy, and remove the duplicate class definitions (also
apply the same extraction for the analogous memory-benchmark duplicate).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (9)
docker-compose-dev.yml (1)

19-19: Pin the postgres image to a specific version.

postgres:latest resolves to whatever the current latest release is at pull time. The adjacent MySQL service uses mysql:8.3.0. An unexpected major-version bump (e.g., 16 → 17) can silently break schema-init behavior or driver compatibility in CI. Docker generally recommends using a specific version of Postgres, since :latest automatically changes with each new release and it's hard to know whether newer versions will introduce breaking changes or vulnerabilities.

🔧 Proposed fix
-    image: postgres:latest
+    image: postgres:16
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose-dev.yml` at line 19, The docker-compose Postgres service
currently uses the floating tag "postgres:latest"; change the image field in
that service to a fixed, explicit Postgres version (e.g., "postgres:15.4" or
whichever minor version your CI/schema was validated against) instead of :latest
to avoid unexpected major upgrades; update the image line in the postgres
service block to the chosen pinned tag and ensure any related CI/dockerfiles
that assume a Postgres major version are kept consistent.
tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py (1)

117-193: Significant code duplication between run-path and legacy-path benchmarks.

The four benchmarks split cleanly into run vs legacy, but the session factory setup, context creation, and run_transaction body are copy-pasted verbatim. A thin parametrised or factory approach keeps benchmark names explicit while eliminating the duplication:

♻️ Sketch of a parametrised helper
import pytest

STORAGE_VARIANTS = [
    pytest.param(SqlAlchemySagaStorage, id="run"),
    pytest.param(SqlAlchemySagaStorageLegacy, id="legacy"),
]


def _make_storage(engine, storage_cls):
    return storage_cls(async_sessionmaker(engine, expire_on_commit=False,
                                          autocommit=False, autoflush=False))


`@pytest.mark.benchmark`
`@pytest.mark.parametrize`("storage_cls", STORAGE_VARIANTS)
def test_benchmark_saga_sqlalchemy_full_transaction(
    benchmark, saga_sqlalchemy, saga_container, saga_benchmark_loop_and_engine, storage_cls
):
    loop, engine = saga_benchmark_loop_and_engine
    storage = _make_storage(engine, storage_cls)
    context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0)

    async def run_transaction() -> None:
        async with saga_sqlalchemy.transaction(
            context=context, container=saga_container, storage=storage,
        ) as transaction:
            async for _ in transaction:
                pass

    benchmark(lambda: loop.run_until_complete(run_transaction()))

CodSpeed will surface the two variants as test_benchmark_saga_sqlalchemy_full_transaction[run] and ...[legacy], preserving the naming distinction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 117
- 193, The two legacy benchmarks duplicate session_factory, storage creation,
context creation, and the run_transaction body; refactor by adding a small
factory/helper (e.g., _make_storage(engine, storage_cls)) to build the storage
using async_sessionmaker and parametrize the tests over storage classes
(SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy) so you keep test names
explicit via pytest.param ids; update
test_benchmark_saga_sqlalchemy_legacy_full_transaction and
test_benchmark_saga_sqlalchemy_legacy_single_step to accept a storage_cls
parameter, call _make_storage(loop_engine[1], storage_cls) and reuse a single
run_transaction implementation (or inline identical body) instead of duplicating
setup and iteration logic.
docker-compose-test.yml (1)

19-19: Pin the postgres image version for reproducible CI.

Same issue as docker-compose-dev.yml: postgres:latest can silently change between pipeline runs, while mysql_tests is pinned at 8.3.0.

🔧 Proposed fix
-    image: postgres:latest
+    image: postgres:16
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose-test.yml` at line 19, The docker-compose test service
currently uses the floating image tag "postgres:latest" which can change between
CI runs; replace that image value with a pinned, explicit Postgres tag (e.g.,
"postgres:15.4" or a specific alpine/patch tag your CI expects) so the test
service image is reproducible—update the image field for the Postgres service
(the line currently containing "image: postgres:latest") to the chosen fixed tag
and ensure any CI/health-checks or related configs reference the same pinned
version.
tests/integration/test_saga_mediator_sqlalchemy_postgres.py (1)

36-74: _TestContainer is duplicated verbatim between MySQL and Postgres test modules.

This container class, along with the container and saga_mediator fixtures, are identical in both test_saga_mediator_sqlalchemy_mysql.py and test_saga_mediator_sqlalchemy_postgres.py. Consider extracting the shared container and fixture logic into a common conftest or helper module to avoid maintaining two copies.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py` around lines 36
- 74, Duplicate _TestContainer, its fixtures (container, saga_mediator), and
related helpers should be extracted into a shared test utility to avoid code
duplication; create a common test module (e.g., tests/conftest or tests/helpers)
that defines the _TestContainer class (including methods resolve,
attach_external_container, and property external_container) and the container
and saga_mediator fixtures, then update both
test_saga_mediator_sqlalchemy_mysql.py and
test_saga_mediator_sqlalchemy_postgres.py to import and use those shared symbols
instead of defining them locally.
examples/saga_recovery_scheduler.py (2)

267-267: Inconsistent logging style: f-string vs %-style.

Line 267 switches to an f-string while the rest of the file (e.g., lines 205, 215, 232-237, 275-278) uses %-style placeholders for logger calls. Pick one style for consistency within the file.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/saga_recovery_scheduler.py` at line 267, The logger call using an
f-string is inconsistent with the rest of the file; update the logger.info call
that references shipment_id, order_id, and tracking_number to use the same
%‑style placeholder formatting used elsewhere (i.e., call logger.info with a
format string containing %s placeholders and pass shipment_id, order_id,
tracking_number as separate arguments) so it matches lines like the other
logger.* usages.

520-527: logger.exception already includes the traceback — traceback.format_exc() duplicates it.

logger.exception(...) appends the current exception's traceback automatically. Embedding traceback.format_exc() in the f-string causes the full stack trace to appear twice in each log record.

Proposed fix
-                logger.info(f"Saga {saga_id} recovery completed compensation: {traceback.format_exc()}")
+                logger.info(f"Saga {saga_id} recovery completed compensation.")
                 processed += 1
             else:
-                logger.exception(f"Saga {saga_id} recovery failed: {traceback.format_exc()}")
+                logger.exception(f"Saga {saga_id} recovery failed.")
                 processed += 1
-        except Exception:
-            logger.exception(f"Saga {saga_id} recovery failed: {traceback.format_exc()}")
+        except Exception:
+            logger.exception(f"Saga {saga_id} recovery failed.")

The same issue appears at line 565 in recovery_loop:

-            logger.exception(f"Recovery iteration failed: {traceback.format_exc()}")
+            logger.exception("Recovery iteration failed.")

If these are all addressed, the import traceback on line 82 can be removed as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/saga_recovery_scheduler.py` around lines 520 - 527, The log calls in
recovery_loop that use logger.exception or logger.info currently embed
traceback.format_exc(), causing duplicate stack traces; update the calls in the
recovery_loop (references: saga_id variable and the recovery_loop function) to
remove traceback.format_exc() — use logger.exception(f"Saga {saga_id} recovery
failed") for exception cases and logger.info(f"Saga {saga_id} recovery completed
compensation") for the successful case; after fixing all occurrences (including
the one around line 565), remove the now-unused import traceback.
src/cqrs/saga/storage/sqlalchemy.py (1)

215-237: update_status silently succeeds when the saga doesn't exist.

Unlike update_context (which checks rowcount == 0 when current_version is set), update_status in the run does not verify that any row was actually updated. If saga_id doesn't exist, the UPDATE silently affects zero rows. The parent class has the same gap, so this is consistent — but within a scoped run where the saga was just created in the same session, this is unlikely to be hit in practice.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/storage/sqlalchemy.py` around lines 215 - 237, The
update_status method on the SQLAlchemy saga storage silently succeeds when no
row is updated; modify update_status (in src/cqrs/saga/storage/sqlalchemy.py) to
inspect the result of await self._session.execute(...) (check result.rowcount)
and raise an appropriate error when rowcount == 0 (same behavior as
update_context when current_version is set). Reference the SagaExecutionModel
and the update_status method when adding the check so the call will surface a
missing-saga error instead of silently doing nothing.
tests/integration/test_saga_storage_sqlalchemy_postgres.py (1)

242-256: asyncio.sleep(1.0) for ordering test may be fragile on slow CI.

The test relies on a 1-second wall-clock gap so that updated_at differs between sagas. If the DB server's clock resolution or CI latency causes issues, this could become flaky. Consider updating the updated_at column directly (similar to how create_interrupted_saga in the example manipulates updated_at) or using a longer sleep as a safety margin.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py` around lines 242
- 256, The test test_get_sagas_for_recovery_ordered_by_updated_at uses
asyncio.sleep(1.0) to force an updated_at ordering which is flaky; instead,
modify the test to avoid wall-clock sleeps by directly adjusting the updated_at
timestamp for the target saga (like create_interrupted_saga does) after calling
storage.create_saga/storage.update_status and before calling
storage.get_sagas_for_recovery — e.g., use the storage's DB session/engine or an
existing helper to set the updated_at of id2 to a later timestamp so the
ordering is deterministic when asserting ids[-1] == id2.
src/cqrs/saga/compensation.py (1)

132-145: Both branches unconditionally mark saga as FAILED — intentional but the comment on Line 144 is misleading.

Line 144 says "If all compensations succeeded (or were skipped), mark as failed" — which is correct behavior (compensation means the forward saga failed), but the comment reads as if FAILED is surprising. A small rewording would improve clarity.

Suggested comment tweak
         else:
-            # If all compensations succeeded (or were skipped), mark as failed
+            # All compensations succeeded (or were skipped); mark saga as FAILED
+            # because compensation itself means the forward execution failed.
             await self._storage.update_status(self._saga_id, SagaStatus.FAILED)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/compensation.py` around lines 132 - 145, The comment on the
else branch is misleading: although both branches set the saga status to
SagaStatus.FAILED, the else branch should clearly state that marking FAILED is
intentional because all compensations completed (or were skipped) after the
forward saga failed; update the comment near the else that precedes await
self._storage.update_status(self._saga_id, SagaStatus.FAILED) to explicitly say
something like "All compensations completed or were skipped — mark saga as
FAILED because the original forward transaction failed", keeping the logic
(compensation_errors, SagaStatus.FAILED, _storage.update_status, _saga_id)
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/tests.yml:
- Around line 136-139: Move the hardcoded test DSNs out of the Actions YAML by
replacing the literal values for the env keys DATABASE_DSN, DATABASE_DSN_MYSQL,
and DATABASE_DSN_POSTGRESQL with references to GitHub Actions secrets (e.g. use
the secrets store and reference them as secrets.DATABASE_DSN,
secrets.DATABASE_DSN_MYSQL, secrets.DATABASE_DSN_POSTGRESQL in the workflow),
then add those three secret entries in the repository's GitHub Actions Secrets
with the corresponding DSN values and update any other places that duplicate the
credentials (tests/pytest-config.ini and docker-compose-test.yml) to either read
from the same secrets or from a single source to avoid duplication.

In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 363-385: The except block in create_run's inner _run context
manager currently uses "except Exception:" which does not catch
asyncio.CancelledError (a BaseException subclass), so update the handler in _run
to catch BaseException instead of Exception; specifically, change the except to
"except BaseException:" so that _SqlAlchemySagaStorageRun.rollback() is always
awaited (await run.rollback()) before re-raising the cancellation or other
BaseException, preserving the session closing behavior.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`:
- Around line 1-5: Two tests violate the module docstring naming convention for
scoped run benchmarks: rename test_benchmark_saga_memory_full_transaction to
test_benchmark_saga_memory_run_full_transaction and
test_benchmark_saga_memory_single_step to
test_benchmark_saga_memory_run_single_step; update all references (test function
definitions, any pytest markers/decorators, and uses in fixtures or
parametrizations) so callers and markers match the new names and maintain
consistency with test_benchmark_saga_memory_run_ten_transactions.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`:
- Around line 1-5: Rename the two scoped-run test functions to include the
`_run_` infix so they follow the module naming convention: change
test_benchmark_saga_sqlalchemy_full_transaction to
test_benchmark_saga_sqlalchemy_run_full_transaction and
test_benchmark_saga_sqlalchemy_single_step to
test_benchmark_saga_sqlalchemy_run_single_step; update any references/usages
(pytest markers, imports, or fixtures) that call these functions and adjust the
names in the module docstring/examples if present to keep consistency. Ensure
the function definitions for the affected tests are the only changed symbols (no
behavior changes).

In `@tests/benchmarks/default/test_benchmark_saga_memory.py`:
- Around line 1-5: Rename the two benchmark test functions that use the
scoped-run path to include the "_run_" infix so names match the module
docstring: change test_benchmark_saga_memory_full_transaction to
test_benchmark_saga_memory_run_full_transaction and
test_benchmark_saga_memory_single_step to
test_benchmark_saga_memory_run_single_step; update any references to these
function names (pytest markers, imports, or string lookups) so test discovery
and CI continue to work, and keep
test_benchmark_saga_memory_run_ten_transactions unchanged.

In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py`:
- Line 26: The docstring for SqlAlchemySagaStorage in
tests/integration/test_saga_storage_sqlalchemy_mysql.py is written in Russian;
update it to English for consistency (replace the Russian string
"""SqlAlchemySagaStorage для MySQL (фикстура init_saga_orm_mysql поднимает
схему).""" with an English equivalent such as """SqlAlchemySagaStorage for MySQL
(the init_saga_orm_mysql fixture sets up the schema).""" so readers and
contributors see a clear English description referencing the init_saga_orm_mysql
fixture).

---

Duplicate comments:
In `@README.md`:
- Around line 325-328: The code previously had a namespace typo (events_models
vs event_models); update any remaining references to the old identifier so both
mapper.bind calls in init_events use event_models consistently, search for other
uses of events_models and replace them with event_models, and ensure imports
(event_models, events, event_handlers) remain correct for functions/classes like
init_events, events.EventMap, mapper.bind, and
event_handlers.MeetingRoomClosedNotificationHandler; after changes run tests or
lint to confirm no unresolved names remain.

In `@src/cqrs/saga/saga.py`:
- Around line 382-394: The FAILED step log is only staged by
state_manager.log_step and can be lost if compensator.compensate_steps raises
before the run's first commit; after calling await state_manager.log_step(...)
in the except block, immediately persist the staged log by invoking await
run.commit() (guarding with if run is not None) before starting
compensator.compensate_steps(self._completed_steps), preserving the current
error handling and rollback semantics if commit itself fails.

In `@tests/benchmarks/default/test_benchmark_saga_memory.py`:
- Around line 164-179: The fixture saga_with_memory_storage declares unused
parameters saga_container and memory_storage causing unnecessary pytest setup;
remove these parameters from the fixture signature so it becomes def
saga_with_memory_storage() -> Saga[OrderContext]: and keep returning OrderSaga()
(which defines steps = [ReserveInventoryStep, ProcessPaymentStep,
ShipOrderStep]); alternatively, if those resources are required, reference
saga_container or memory_storage inside the fixture (e.g., pass into OrderSaga
or use to configure it) — choose one approach and update the
saga_with_memory_storage fixture accordingly.

---

Nitpick comments:
In `@docker-compose-dev.yml`:
- Line 19: The docker-compose Postgres service currently uses the floating tag
"postgres:latest"; change the image field in that service to a fixed, explicit
Postgres version (e.g., "postgres:15.4" or whichever minor version your
CI/schema was validated against) instead of :latest to avoid unexpected major
upgrades; update the image line in the postgres service block to the chosen
pinned tag and ensure any related CI/dockerfiles that assume a Postgres major
version are kept consistent.

In `@docker-compose-test.yml`:
- Line 19: The docker-compose test service currently uses the floating image tag
"postgres:latest" which can change between CI runs; replace that image value
with a pinned, explicit Postgres tag (e.g., "postgres:15.4" or a specific
alpine/patch tag your CI expects) so the test service image is
reproducible—update the image field for the Postgres service (the line currently
containing "image: postgres:latest") to the chosen fixed tag and ensure any
CI/health-checks or related configs reference the same pinned version.

In `@examples/saga_recovery_scheduler.py`:
- Line 267: The logger call using an f-string is inconsistent with the rest of
the file; update the logger.info call that references shipment_id, order_id, and
tracking_number to use the same %‑style placeholder formatting used elsewhere
(i.e., call logger.info with a format string containing %s placeholders and pass
shipment_id, order_id, tracking_number as separate arguments) so it matches
lines like the other logger.* usages.
- Around line 520-527: The log calls in recovery_loop that use logger.exception
or logger.info currently embed traceback.format_exc(), causing duplicate stack
traces; update the calls in the recovery_loop (references: saga_id variable and
the recovery_loop function) to remove traceback.format_exc() — use
logger.exception(f"Saga {saga_id} recovery failed") for exception cases and
logger.info(f"Saga {saga_id} recovery completed compensation") for the
successful case; after fixing all occurrences (including the one around line
565), remove the now-unused import traceback.

In `@src/cqrs/saga/compensation.py`:
- Around line 132-145: The comment on the else branch is misleading: although
both branches set the saga status to SagaStatus.FAILED, the else branch should
clearly state that marking FAILED is intentional because all compensations
completed (or were skipped) after the forward saga failed; update the comment
near the else that precedes await self._storage.update_status(self._saga_id,
SagaStatus.FAILED) to explicitly say something like "All compensations completed
or were skipped — mark saga as FAILED because the original forward transaction
failed", keeping the logic (compensation_errors, SagaStatus.FAILED,
_storage.update_status, _saga_id) unchanged.

In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 215-237: The update_status method on the SQLAlchemy saga storage
silently succeeds when no row is updated; modify update_status (in
src/cqrs/saga/storage/sqlalchemy.py) to inspect the result of await
self._session.execute(...) (check result.rowcount) and raise an appropriate
error when rowcount == 0 (same behavior as update_context when current_version
is set). Reference the SagaExecutionModel and the update_status method when
adding the check so the call will surface a missing-saga error instead of
silently doing nothing.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`:
- Around line 117-193: The two legacy benchmarks duplicate session_factory,
storage creation, context creation, and the run_transaction body; refactor by
adding a small factory/helper (e.g., _make_storage(engine, storage_cls)) to
build the storage using async_sessionmaker and parametrize the tests over
storage classes (SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy) so you
keep test names explicit via pytest.param ids; update
test_benchmark_saga_sqlalchemy_legacy_full_transaction and
test_benchmark_saga_sqlalchemy_legacy_single_step to accept a storage_cls
parameter, call _make_storage(loop_engine[1], storage_cls) and reuse a single
run_transaction implementation (or inline identical body) instead of duplicating
setup and iteration logic.

In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py`:
- Around line 36-74: Duplicate _TestContainer, its fixtures (container,
saga_mediator), and related helpers should be extracted into a shared test
utility to avoid code duplication; create a common test module (e.g.,
tests/conftest or tests/helpers) that defines the _TestContainer class
(including methods resolve, attach_external_container, and property
external_container) and the container and saga_mediator fixtures, then update
both test_saga_mediator_sqlalchemy_mysql.py and
test_saga_mediator_sqlalchemy_postgres.py to import and use those shared symbols
instead of defining them locally.

In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py`:
- Around line 242-256: The test
test_get_sagas_for_recovery_ordered_by_updated_at uses asyncio.sleep(1.0) to
force an updated_at ordering which is flaky; instead, modify the test to avoid
wall-clock sleeps by directly adjusting the updated_at timestamp for the target
saga (like create_interrupted_saga does) after calling
storage.create_saga/storage.update_status and before calling
storage.get_sagas_for_recovery — e.g., use the storage's DB session/engine or an
existing helper to set the updated_at of id2 to a later timestamp so the
ordering is deterministic when asserting ids[-1] == id2.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (5)
src/cqrs/outbox/mock.py (1)

33-35: Redundant inner list() on the else branch.

self.session.values() is directly iterable; wrapping it in list(...) before passing it to the outer list(...) allocates a throwaway intermediate list. The else branch can be simplified to just self.session.values().

♻️ Proposed simplification
         return list(
-            filter(lambda e: topic == e.topic, self.session.values()) if topic else list(self.session.values()),
+            filter(lambda e: topic == e.topic, self.session.values()) if topic else self.session.values(),
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/mock.py` around lines 33 - 35, The return in the mock outbox
method currently wraps self.session.values() in an unnecessary inner list() when
topic is falsy; update the expression in the return (the lambda/filter branch in
the function that returns events from self.session) to pass
self.session.values() directly to the outer list() instead of
list(self.session.values()), removing the redundant allocation while keeping the
topic filtering logic (i.e., keep the filter(lambda e: topic == e.topic,
self.session.values()) when topic is truthy).
tests/unit/test_deserializers.py (1)

82-95: Test name doesn't match what is actually being tested.

test_json_deserializer_missing_required_fields_negative implies that a required field is absent, but the JSON at line 89 supplies all required fields (event_name, payload.foo, payload.bar). It exercises a wrong-type input ("not_an_int" for an int field), not a missing-field scenario. test_json_deserializer_invalid_structure_negative (line 67) already covers the missing-field case (event_name omitted), so the naming here is both inaccurate and potentially duplicates intent.

✏️ Suggested rename + clarifying comment
-def test_json_deserializer_missing_required_fields_negative():
+def test_json_deserializer_wrong_field_type_negative():
     deserializer = json.JsonDeserializer(
         model=cqrs.NotificationEvent[DeserializedModelPayload],
     )
 
-    # JSON with payload that has wrong type for required field 'bar' (string instead of int)
-    # This should cause a validation error when Pydantic tries to validate the payload
+    # 'bar' is declared as int; passing a non-coercible string should trigger a Pydantic validation error
     incomplete_json = '{"event_name": "test", "payload": {"foo": "bar", "bar": "not_an_int"}}'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/test_deserializers.py` around lines 82 - 95, The test named
test_json_deserializer_missing_required_fields_negative is misnamed because it
feeds a wrong-type value (incomplete_json with "bar": "not_an_int") rather than
omitting a required field; rename the test to something like
test_json_deserializer_invalid_field_type_negative (or
test_json_deserializer_wrong_type_negative) and update its inline comment to
state it verifies type-validation failure for payload.bar (using
json.JsonDeserializer and the incomplete_json payload) to avoid duplicating the
missing-field test (test_json_deserializer_invalid_structure_negative).
src/cqrs/requests/mermaid.py (1)

210-235: Consider extracting the repeated field-type extraction into a helper.

The dataclass / Pydantic-v2 / Pydantic-v1 branching block at lines 211-233 is duplicated verbatim at lines 242-264 (the only difference is the surrounding variable). All four lines touched by this PR live inside these twin blocks, meaning any future fix will need to be applied twice. A small helper avoids the drift:

♻️ Proposed refactor
+    `@staticmethod`
+    def _extract_fields(cls: type) -> list[tuple[str, str]]:
+        """Return [(field_name, field_type_str)] for dataclass, Pydantic v2, or Pydantic v1 models."""
+        if hasattr(cls, "__dataclass_fields__"):
+            return [
+                (n, fi.type.__name__ if hasattr(fi.type, "__name__") else str(fi.type))
+                for n, fi in cls.__dataclass_fields__.items()
+            ]
+        if hasattr(cls, "model_fields"):  # Pydantic v2
+            return [
+                (n, fi.annotation.__name__ if hasattr(fi.annotation, "__name__") else str(fi.annotation))
+                for n, fi in cls.model_fields.items()
+            ]
+        if hasattr(cls, "__fields__"):  # Pydantic v1
+            return [
+                (n, fi.type_.__name__ if hasattr(fi.type_, "__name__") else str(fi.type_))
+                for n, fi in cls.__fields__.items()
+            ]
+        return []

Then both the request-types and response-types loops become:

-            if hasattr(request_type, "__dataclass_fields__"):
-                ...
-            elif hasattr(request_type, "model_fields"):
-                ...
-            elif hasattr(request_type, "__fields__"):
-                ...
+            for field_name, field_type in self._extract_fields(request_type):
+                lines.append(f"        +{field_name}: {field_type}")

(Apply the same substitution for the response_type block.)

Also applies to: 241-265

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/requests/mermaid.py` around lines 210 - 235, The block that extracts
field names and types for dataclasses, Pydantic v2 (model_fields) and Pydantic
v1 (__fields__) is duplicated for request_type and response_type; create a small
helper function (e.g., _extract_field_entries or format_type_fields) that
accepts a type object (like request_type/response_type) and returns the list of
formatted "        +{name}: {type}" lines, handling the three branches
(.__dataclass_fields__, .model_fields, .__fields__). Replace both duplicated
loops with calls to this helper and append the returned lines before closing the
"}" so that field extraction logic lives in one place and is reused for
request_type and response_type.
src/cqrs/saga/storage/memory.py (1)

145-157: No-op rollback() means mutations are not undone on error.

Since all mutation methods (create_saga, update_context, update_status, log_step) delegate immediately to the underlying MemorySagaStorage and modify its in-memory dicts in place, calling rollback() after a partial failure leaves the storage in a dirty state. This differs from the SQLAlchemy implementation where rollback() would actually revert uncommitted changes.

For production code this is fine (you'd use SQLAlchemy), but for tests that use MemorySagaStorage with create_run() and expect rollback to undo partial writes, this can cause subtle test pollution. Consider documenting this caveat on the class or rollback() docstring so future test authors are aware.

Suggested docstring improvement
     async def rollback(self) -> None:
         """
-        Perform no action for rollback; provided to satisfy the SagaStorageRun interface.
+        No-op: provided to satisfy the SagaStorageRun interface.
+
+        Note:
+            Because mutations are applied immediately to the underlying in-memory
+            storage, rollback does NOT undo previously written state. Callers that
+            rely on true transactional rollback should use a persistent storage backend.
         """
         pass
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/storage/memory.py` around lines 145 - 157, Update the docstring
on MemorySagaStorage.run().rollback() (and optionally commit()) to clearly state
that these are no-ops and do not revert in-memory mutations: explain that
methods like MemorySagaStorage.create_saga, update_context, update_status, and
log_step apply changes immediately to the in-memory dicts, so calling
create_run().rollback() will not undo partial writes and test authors should
reset or recreate the storage between tests; reference MemorySagaStorage and
create_run() so readers can find the relevant behavior.
examples/saga_recovery.py (1)

617-618: Implicit f-string concatenation can be simplified.

Multiple locations in this file (lines 617, 688, 799, 867) use adjacent f"..." f"..." literals. These are valid Python (implicit string concatenation) but could be a single f-string for readability.

Example simplification
-            f"    [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}",
+            f"    [{entry.timestamp.strftime('%H:%M:%S')}] {entry.step_name}.{entry.action}: {entry.status.value}",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/saga_recovery.py` around lines 617 - 618, Several places use
adjacent f-strings (implicit concatenation) e.g. the print/format call that
builds "    [{entry.timestamp.strftime('%H:%M:%S')}] "
f"{entry.step_name}.{entry.action}: {entry.status.value}" — replace each pair of
adjacent f-strings with a single f-string combining both parts (e.g. f"   
[{entry.timestamp.strftime('%H:%M:%S')}] {entry.step_name}.{entry.action}:
{entry.status.value}") to improve readability; apply the same change for the
other occurrences that reference entry.timestamp, entry.step_name, entry.action,
and entry.status.value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@examples/saga_recovery.py`:
- Around line 617-618: Several places use adjacent f-strings (implicit
concatenation) e.g. the print/format call that builds "   
[{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}:
{entry.status.value}" — replace each pair of adjacent f-strings with a single
f-string combining both parts (e.g. f"   
[{entry.timestamp.strftime('%H:%M:%S')}] {entry.step_name}.{entry.action}:
{entry.status.value}") to improve readability; apply the same change for the
other occurrences that reference entry.timestamp, entry.step_name, entry.action,
and entry.status.value.

In `@src/cqrs/outbox/mock.py`:
- Around line 33-35: The return in the mock outbox method currently wraps
self.session.values() in an unnecessary inner list() when topic is falsy; update
the expression in the return (the lambda/filter branch in the function that
returns events from self.session) to pass self.session.values() directly to the
outer list() instead of list(self.session.values()), removing the redundant
allocation while keeping the topic filtering logic (i.e., keep the filter(lambda
e: topic == e.topic, self.session.values()) when topic is truthy).

In `@src/cqrs/requests/mermaid.py`:
- Around line 210-235: The block that extracts field names and types for
dataclasses, Pydantic v2 (model_fields) and Pydantic v1 (__fields__) is
duplicated for request_type and response_type; create a small helper function
(e.g., _extract_field_entries or format_type_fields) that accepts a type object
(like request_type/response_type) and returns the list of formatted "       
+{name}: {type}" lines, handling the three branches (.__dataclass_fields__,
.model_fields, .__fields__). Replace both duplicated loops with calls to this
helper and append the returned lines before closing the "}" so that field
extraction logic lives in one place and is reused for request_type and
response_type.

In `@src/cqrs/saga/storage/memory.py`:
- Around line 145-157: Update the docstring on
MemorySagaStorage.run().rollback() (and optionally commit()) to clearly state
that these are no-ops and do not revert in-memory mutations: explain that
methods like MemorySagaStorage.create_saga, update_context, update_status, and
log_step apply changes immediately to the in-memory dicts, so calling
create_run().rollback() will not undo partial writes and test authors should
reset or recreate the storage between tests; reference MemorySagaStorage and
create_run() so readers can find the relevant behavior.

In `@tests/unit/test_deserializers.py`:
- Around line 82-95: The test named
test_json_deserializer_missing_required_fields_negative is misnamed because it
feeds a wrong-type value (incomplete_json with "bar": "not_an_int") rather than
omitting a required field; rename the test to something like
test_json_deserializer_invalid_field_type_negative (or
test_json_deserializer_wrong_type_negative) and update its inline comment to
state it verifies type-validation failure for payload.bar (using
json.JsonDeserializer and the incomplete_json payload) to avoid duplicating the
missing-field test (test_json_deserializer_invalid_structure_negative).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (12)
examples/saga_recovery_scheduler.py (1)

518-530: Inconsistent logging format within run_recovery_iteration

Lines 518 and 520 use f-strings while lines 524, 527, and 530 (all in the same exception-handling block) retain %s lazy formatting. The change introduced this split. Beyond the style inconsistency, f-string arguments are always evaluated eagerly — before the logging framework checks the effective log level — whereas %s placeholders let the framework skip formatting when the level is suppressed. In example code this rarely matters, but unifying on %s would both remove the inconsistency and match the rest of the calls in this file.

♻️ Suggested unification to `%s` formatting
-            logger.info(f"Recovering saga {saga_id}...")
+            logger.info("Recovering saga %s...", saga_id)
             await recover_saga(saga, saga_id, context_builder, container, storage)
-            logger.info(f"Saga {saga_id} recovered successfully.")
+            logger.info("Saga %s recovered successfully.", saga_id)

Apply the same pattern to lines 554, 562, and 572:

-        logger.info(f"Recovery iteration {iteration}")
+        logger.info("Recovery iteration %d", iteration)
...
-                logger.info(f"Processed {processed} saga(s) this iteration.")
+                logger.info("Processed %d saga(s) this iteration.", processed)
...
-            logger.info(f"Reached max_iterations={max_iterations}, stopping.")
+            logger.info("Reached max_iterations=%d, stopping.", max_iterations)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/saga_recovery_scheduler.py` around lines 518 - 530, The logging
calls in run_recovery_iteration are inconsistent: replace the eager f-strings
used in logger.info(f"Recovering saga {saga_id}...") and logger.info(f"Saga
{saga_id} recovered successfully.") with lazy %-style formatting (e.g.,
logger.info("Recovering saga %s...", saga_id) and logger.info("Saga %s recovered
successfully.", saga_id)) so they match the existing
logger.exception/printf-style calls and avoid eager evaluation; also apply the
same %-style unification to the other similar log lines referenced (the calls
around the later recovery blocks, currently at the locations corresponding to
lines noted in the review) to keep formatting consistent across recover_saga,
run_recovery_iteration and related recovery logging.
tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py (2)

23-31: _make_storage helper duplicates inline session_factory creation in run-path tests.

The run-path benchmarks (Lines 67–73) create async_sessionmaker inline with the same config as _make_storage (Lines 25–30). Consider using _make_storage in the run-path tests too, reducing duplication.

Also applies to: 57-85

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 23 -
31, The run-path benchmarks duplicate creation of the async_sessionmaker with
the same config as the helper _make_storage; update the run-path benchmark code
to call _make_storage(engine, storage_cls) instead of inlining
async_sessionmaker so the shared session_factory configuration
(expire_on_commit=False, autocommit=False, autoflush=False) is centralized;
ensure you pass the same storage class used inline into _make_storage and remove
the duplicated session_factory construction to avoid drift.

131-158: Parameterized "legacy" benchmarks also run the scoped-run storage, causing redundancy and confusing naming.

test_benchmark_saga_sqlalchemy_legacy_full_transaction is parametrized with both SqlAlchemySagaStorage (run-path) and SqlAlchemySagaStorageLegacy, but the function name says "legacy". This creates two issues:

  1. Redundancy: SqlAlchemySagaStorage is already benchmarked in the dedicated test_benchmark_saga_sqlalchemy_run_full_transaction.
  2. Misleading test IDs: The generated test ID …_legacy_full_transaction[storage] suggests legacy behavior when it actually exercises the run path.

Consider either:

  • Removing SqlAlchemySagaStorage from the parametrization (keep only SqlAlchemySagaStorageLegacy), or
  • Renaming to a neutral name like *_compare_* if the intent is to compare both paths.

Same applies to test_benchmark_saga_sqlalchemy_legacy_single_step.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 131
- 158, The test test_benchmark_saga_sqlalchemy_legacy_full_transaction is
parametrized with both SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy
which is misleading given the function name; either restrict the parametrization
to only SqlAlchemySagaStorageLegacy (remove SqlAlchemySagaStorage from the list
and its "storage" id) so the test truly targets the legacy path, or rename the
test to a neutral name (e.g.,
test_benchmark_saga_sqlalchemy_compare_full_transaction) and update the ids to
reflect both variants; apply the same change pattern to
test_benchmark_saga_sqlalchemy_legacy_single_step to avoid redundant
benchmarking and confusing IDs.
tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py (1)

120-201: Session factory creation is repeated inline across legacy tests.

Lines 134–139 and 178–183 duplicate the same async_sessionmaker(engine, expire_on_commit=False, autocommit=False, autoflush=False) pattern. The default variant uses a _make_storage helper for this. Consider extracting a similar helper here for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py` around lines
120 - 201, Extract the repeated async_sessionmaker+SqlAlchemySagaStorageLegacy
setup into a helper (e.g. _make_legacy_storage) and call it from both
test_benchmark_saga_sqlalchemy_legacy_full_transaction and
test_benchmark_saga_sqlalchemy_legacy_single_step; specifically, move the
async_sessionmaker(engine, expire_on_commit=False, autocommit=False,
autoflush=False) + SqlAlchemySagaStorageLegacy(session_factory) creation into a
single function that accepts the engine (and optionally config flags) and
returns the configured storage, then replace the inline session_factory/storage
logic in the two tests with calls to that helper to eliminate duplication and
mirror the existing _make_storage pattern.
tests/integration/test_saga_mediator_sqlalchemy_postgres.py (1)

63-93: test_saga_mediator_emits_events duplicates test_saga_mediator_processes_events_from_steps.

Both tests create a context, stream the saga, then assert len(handler.handled_events) >= 1 on the same three handlers. The only difference is the first test also collects step_results. Consider merging them or differentiating the assertions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py` around lines 63
- 93, The two tests test_saga_mediator_processes_events_from_steps and
test_saga_mediator_emits_events are redundant: both stream a saga context and
assert the same handlers (InventoryReservedEventHandler,
PaymentProcessedEventHandler, OrderShippedEventHandler) handled events; the
first also collects step_results. Consolidate them by keeping
test_saga_mediator_processes_events_from_steps and incorporate the emission
assertions from test_saga_mediator_emits_events (or vice versa): ensure you
stream a unique OrderContext (e.g., order_id "789" if keeping the second) and
either assert step_results length == 3 and that
inventory_handler.handled_events, payment_handler.handled_events, and
shipping_handler.handled_events each have length >= 1, then remove the duplicate
test function to avoid duplicated coverage.
tests/integration/conftest.py (1)

73-83: Fixture directly accesses private attributes of _TestContainer for cleanup.

Lines 77–82 reach into container._step_handlers and container._event_handlers. Consider adding a reset() method on _TestContainer to encapsulate this cleanup logic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/conftest.py` around lines 73 - 83, The test fixture is
directly accessing private attributes container._step_handlers and
container._event_handlers to clear internal state; add a public reset() method
on the _TestContainer class that encapsulates this cleanup (iterate its internal
step handlers and event handlers and clear step_handler._events and
event_handler.handled_events as needed) and then replace the manual loops in the
container fixture with a single call to container.reset(); ensure reset() is
named exactly reset so the fixture can call container.reset() without touching
private members.
tests/integration/test_saga_storage_sqlalchemy_postgres.py (2)

242-260: Deterministic ordering test — nice improvement over the MySQL asyncio.sleep approach.

Using direct updated_at manipulation via the session factory (line 252–257) is faster and more deterministic than the MySQL test's asyncio.sleep(1.0). However, this relies on the session_factory attribute being part of SqlAlchemySagaStorage's public contract.

Consider extracting a helper or factoring this into a shared utility if the pattern recurs, to avoid coupling tests to storage.session_factory directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py` around lines 242
- 260, The test test_get_sagas_for_recovery_ordered_by_updated_at directly
manipulates SqlAlchemySagaStorage.session_factory and
SagaExecutionModel.updated_at which couples the test to the storage internals;
refactor by extracting a shared test helper (e.g., a utility function used by
tests that accepts storage and saga_id and updates the underlying updated_at) or
add a public helper method on SqlAlchemySagaStorage (e.g.,
update_saga_timestamp) to perform the session/execute/commit logic, then update
this test to call that helper instead of using storage.session_factory directly
so tests no longer depend on storage.session_factory internals while still
setting updated_at for get_sagas_for_recovery ordering checks.

188-197: Cleanup fixture only runs before yield — leftover data persists after the last test.

_clean_saga_tables deletes data before yielding but does not clean up after the test. While this is a common "setup-only" pattern and works fine when every test method in the class triggers this fixture, data from the last test will remain in the database. If other test classes or modules share the same database and don't clean up, this could cause interference.

If cross-module isolation matters, consider adding post-yield cleanup:

yield
async with saga_session_factory_postgres() as session:
    await session.execute(delete(SagaLogModel))
    await session.execute(delete(SagaExecutionModel))
    await session.commit()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py` around lines 188
- 197, The fixture _clean_saga_tables currently only deletes SagaLogModel and
SagaExecutionModel before yielding, so leftover rows from the last test persist;
modify the fixture to perform the same deletions again after the yield (using
saga_session_factory_postgres to open a session and execute delete(SagaLogModel)
and delete(SagaExecutionModel) followed by commit) so cleanup runs both before
and after each test.
tests/integration/test_saga_storage_sqlalchemy_mysql.py (2)

1-393: MySQL test suite mirrors the Postgres suite — significant duplication.

This file is nearly identical to test_saga_storage_sqlalchemy_postgres.py, with the only differences being fixture names (saga_session_factory_mysql vs saga_session_factory_postgres), class names, and the ordering test approach (asyncio.sleep vs direct timestamp manipulation).

Consider extracting the shared test logic into a base class or parameterized test module, and having the MySQL/Postgres files only supply fixtures. This would reduce ~350 lines of duplication and ensure both backends are always tested with the same assertions.

# Example: tests/integration/_saga_storage_tests.py (shared base)
class SagaStorageTestBase:
    async def test_full_saga_lifecycle(self, storage, saga_id, test_context):
        ...

# tests/integration/test_saga_storage_sqlalchemy_mysql.py
class TestIntegrationMysql(SagaStorageTestBase):
    ...  # only MySQL-specific fixtures and overrides
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py` around lines 1 -
393, The test file duplicates the Postgres suite; extract shared tests into a
single reusable base (e.g., class SagaStorageTestBase in
tests/integration/_saga_storage_tests.py) containing all test methods like
test_full_saga_lifecycle, test_compensation_scenario,
test_persistence_across_sessions, test_concurrent_updates,
test_optimistic_locking, and the recovery tests, then update
TestIntegrationMysql to inherit from SagaStorageTestBase and only provide
MySQL-specific fixtures (saga_session_factory_mysql, storage fixture
construction using SqlAlchemySagaStorage) and any small differences (e.g.,
ordering behavior) via overrides; do the same for the Postgres test file to
supply saga_session_factory_postgres so both backends reuse the same assertions
without duplication.

242-255: Ordering test relies on asyncio.sleep(1.0) — potentially flaky and slow.

The 1-second sleep introduces real-time dependency. If the database's updated_at timestamp resolution is coarse or if the test environment is under load, the ordering might not be deterministic. The Postgres counterpart uses direct updated_at manipulation, which is faster and more reliable.

Consider using the same direct-timestamp approach as the Postgres test to make this test deterministic and faster:

-        await asyncio.sleep(1.0)
-        await storage.update_context(id2, {**test_context, "touched": True})
+        from datetime import datetime, timedelta, timezone
+        from sqlalchemy import update
+        later = datetime.now(timezone.utc) + timedelta(seconds=10)
+        async with storage.session_factory() as session:
+            await session.execute(
+                update(SagaExecutionModel).where(SagaExecutionModel.id == id2).values(updated_at=later),
+            )
+            await session.commit()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py` around lines 242 -
255, The test test_get_sagas_for_recovery_ordered_by_updated_at is flaky because
it uses asyncio.sleep(1.0) to force an updated_at change; instead, set the
updated_at timestamp deterministically for one saga so ordering is reliable and
fast. Replace the sleep+update_context approach by directly updating the saga
row's updated_at for id2 (using the storage's DB session/engine or a helper on
SqlAlchemySagaStorage) to a timestamp greater than the others, then call
get_sagas_for_recovery and assert id2 is last; use existing methods like
storage.create_saga, storage.update_status and the underlying SQLAlchemy
session/updated_at column to perform the direct timestamp update.
src/cqrs/saga/compensation.py (1)

73-77: SagaStepStatus.COMPLETED used for compensate-action log entries, not SagaStepStatus.COMPENSATED.

The compensator logs successful compensation steps with SagaStepStatus.COMPLETED (line 106) and checks for that status here (line 76). The COMPENSATED enum value is not used by the compensator. This is internally consistent but may confuse contributors who expect the COMPENSATED status to be used for compensated steps.

Consider using SagaStepStatus.COMPENSATED for compensation step logs to better convey the semantic meaning, or add a brief comment explaining the convention.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/compensation.py` around lines 73 - 77, The code records
compensated steps by filtering history entries with SagaStepStatus.COMPLETED but
the compensator itself logs successful compensation using that same COMPLETED
value, which is confusing; update the compensator to log successful compensation
entries with SagaStepStatus.COMPENSATED (or, alternatively, change this filter
to look for COMPENSATED) so the semantics match: modify the compensator's
logging call where it records a successful "compensate" action to use
SagaStepStatus.COMPENSATED, and update the set comprehension that builds
compensated_steps (the one using self._storage.get_step_history and
SagaStepStatus.COMPLETED) to check for SagaStepStatus.COMPENSATED instead; if
you keep the existing behavior, add a brief inline comment near the
compensated_steps computation explaining the convention.
src/cqrs/saga/saga.py (1)

260-327: Multiple sequential commits for saga creation — acceptable but worth noting.

For new sagas with a run (lines 261–269): create_saga → commit → update_status(RUNNING) → commit. If the process crashes between the two commits, the saga persists as PENDING but never transitions to RUNNING. Similarly in the except ValueError recovery path (lines 317–327).

This is acceptable given that recovery can detect PENDING sagas, but ensure the recovery logic handles this edge case (PENDING sagas with no steps).

You could batch create_saga + update_status into a single commit to reduce the crash-window:

             await state_manager.create_saga(
                 self._saga.__class__.__name__,
                 self._context,
             )
-            if run is not None:
-                await run.commit()
             await state_manager.update_status(SagaStatus.RUNNING)
             if run is not None:
                 await run.commit()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/saga/saga.py` around lines 260 - 327, The code calls
create_saga(...), commits, then update_status(RUNNING) and commits again (both
in the initial-new-saga path and the except ValueError recovery path), which
opens a crash window where a saga can remain PENDING; change the flow so that
create_saga and update_status are done inside the same transaction/commit when a
run is present (i.e., call state_manager.create_saga(...) then
state_manager.update_status(SagaStatus.RUNNING) and call run.commit() only once
after both), and for the no-run path either extend create_saga to accept an
initial status or ensure update_status is applied atomically (combine into a
single storage operation); update the blocks around state_manager.create_saga,
state_manager.update_status, and run.commit to perform a single commit per saga
creation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cqrs/saga/saga.py`:
- Around line 165-180: The inner NotImplementedError handler should be removed
because it hides user code errors and can cause duplicate-saga creation: when
create_run() returns a context manager (run_cm) you should await it and
propagate any exceptions from async with run_cm as run: and from async for
step_result in self._execute(run): instead of catching NotImplementedError and
falling back to self._execute(None); update the block that checks run_cm (the
code using run_cm, run and calling _execute) to omit the inner try/except so any
NotImplementedError from _execute or its steps bubbles up, leaving only the
outer try/except that wraps self._storage.create_run().

In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py`:
- Around line 14-28: The test file imports _TestContainer from
test_saga_mediator_memory which shadows the conftest-local _TestContainer used
by the container fixture; remove _TestContainer from the import list in this
test and either (a) import the correct _TestContainer defined in the test
suite's conftest module or (b) drop/replace the parameter type annotation (e.g.,
use typing.Any) on tests that accept container so the pytest-provided fixture's
type is not misreferenced; update references to _TestContainer in this file
(test parameter annotations) accordingly.

---

Nitpick comments:
In `@examples/saga_recovery_scheduler.py`:
- Around line 518-530: The logging calls in run_recovery_iteration are
inconsistent: replace the eager f-strings used in logger.info(f"Recovering saga
{saga_id}...") and logger.info(f"Saga {saga_id} recovered successfully.") with
lazy %-style formatting (e.g., logger.info("Recovering saga %s...", saga_id) and
logger.info("Saga %s recovered successfully.", saga_id)) so they match the
existing logger.exception/printf-style calls and avoid eager evaluation; also
apply the same %-style unification to the other similar log lines referenced
(the calls around the later recovery blocks, currently at the locations
corresponding to lines noted in the review) to keep formatting consistent across
recover_saga, run_recovery_iteration and related recovery logging.

In `@src/cqrs/saga/compensation.py`:
- Around line 73-77: The code records compensated steps by filtering history
entries with SagaStepStatus.COMPLETED but the compensator itself logs successful
compensation using that same COMPLETED value, which is confusing; update the
compensator to log successful compensation entries with
SagaStepStatus.COMPENSATED (or, alternatively, change this filter to look for
COMPENSATED) so the semantics match: modify the compensator's logging call where
it records a successful "compensate" action to use SagaStepStatus.COMPENSATED,
and update the set comprehension that builds compensated_steps (the one using
self._storage.get_step_history and SagaStepStatus.COMPLETED) to check for
SagaStepStatus.COMPENSATED instead; if you keep the existing behavior, add a
brief inline comment near the compensated_steps computation explaining the
convention.

In `@src/cqrs/saga/saga.py`:
- Around line 260-327: The code calls create_saga(...), commits, then
update_status(RUNNING) and commits again (both in the initial-new-saga path and
the except ValueError recovery path), which opens a crash window where a saga
can remain PENDING; change the flow so that create_saga and update_status are
done inside the same transaction/commit when a run is present (i.e., call
state_manager.create_saga(...) then
state_manager.update_status(SagaStatus.RUNNING) and call run.commit() only once
after both), and for the no-run path either extend create_saga to accept an
initial status or ensure update_status is applied atomically (combine into a
single storage operation); update the blocks around state_manager.create_saga,
state_manager.update_status, and run.commit to perform a single commit per saga
creation.

In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`:
- Around line 120-201: Extract the repeated
async_sessionmaker+SqlAlchemySagaStorageLegacy setup into a helper (e.g.
_make_legacy_storage) and call it from both
test_benchmark_saga_sqlalchemy_legacy_full_transaction and
test_benchmark_saga_sqlalchemy_legacy_single_step; specifically, move the
async_sessionmaker(engine, expire_on_commit=False, autocommit=False,
autoflush=False) + SqlAlchemySagaStorageLegacy(session_factory) creation into a
single function that accepts the engine (and optionally config flags) and
returns the configured storage, then replace the inline session_factory/storage
logic in the two tests with calls to that helper to eliminate duplication and
mirror the existing _make_storage pattern.

In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`:
- Around line 23-31: The run-path benchmarks duplicate creation of the
async_sessionmaker with the same config as the helper _make_storage; update the
run-path benchmark code to call _make_storage(engine, storage_cls) instead of
inlining async_sessionmaker so the shared session_factory configuration
(expire_on_commit=False, autocommit=False, autoflush=False) is centralized;
ensure you pass the same storage class used inline into _make_storage and remove
the duplicated session_factory construction to avoid drift.
- Around line 131-158: The test
test_benchmark_saga_sqlalchemy_legacy_full_transaction is parametrized with both
SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy which is misleading given
the function name; either restrict the parametrization to only
SqlAlchemySagaStorageLegacy (remove SqlAlchemySagaStorage from the list and its
"storage" id) so the test truly targets the legacy path, or rename the test to a
neutral name (e.g., test_benchmark_saga_sqlalchemy_compare_full_transaction) and
update the ids to reflect both variants; apply the same change pattern to
test_benchmark_saga_sqlalchemy_legacy_single_step to avoid redundant
benchmarking and confusing IDs.

In `@tests/integration/conftest.py`:
- Around line 73-83: The test fixture is directly accessing private attributes
container._step_handlers and container._event_handlers to clear internal state;
add a public reset() method on the _TestContainer class that encapsulates this
cleanup (iterate its internal step handlers and event handlers and clear
step_handler._events and event_handler.handled_events as needed) and then
replace the manual loops in the container fixture with a single call to
container.reset(); ensure reset() is named exactly reset so the fixture can call
container.reset() without touching private members.

In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py`:
- Around line 63-93: The two tests
test_saga_mediator_processes_events_from_steps and
test_saga_mediator_emits_events are redundant: both stream a saga context and
assert the same handlers (InventoryReservedEventHandler,
PaymentProcessedEventHandler, OrderShippedEventHandler) handled events; the
first also collects step_results. Consolidate them by keeping
test_saga_mediator_processes_events_from_steps and incorporate the emission
assertions from test_saga_mediator_emits_events (or vice versa): ensure you
stream a unique OrderContext (e.g., order_id "789" if keeping the second) and
either assert step_results length == 3 and that
inventory_handler.handled_events, payment_handler.handled_events, and
shipping_handler.handled_events each have length >= 1, then remove the duplicate
test function to avoid duplicated coverage.

In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py`:
- Around line 1-393: The test file duplicates the Postgres suite; extract shared
tests into a single reusable base (e.g., class SagaStorageTestBase in
tests/integration/_saga_storage_tests.py) containing all test methods like
test_full_saga_lifecycle, test_compensation_scenario,
test_persistence_across_sessions, test_concurrent_updates,
test_optimistic_locking, and the recovery tests, then update
TestIntegrationMysql to inherit from SagaStorageTestBase and only provide
MySQL-specific fixtures (saga_session_factory_mysql, storage fixture
construction using SqlAlchemySagaStorage) and any small differences (e.g.,
ordering behavior) via overrides; do the same for the Postgres test file to
supply saga_session_factory_postgres so both backends reuse the same assertions
without duplication.
- Around line 242-255: The test
test_get_sagas_for_recovery_ordered_by_updated_at is flaky because it uses
asyncio.sleep(1.0) to force an updated_at change; instead, set the updated_at
timestamp deterministically for one saga so ordering is reliable and fast.
Replace the sleep+update_context approach by directly updating the saga row's
updated_at for id2 (using the storage's DB session/engine or a helper on
SqlAlchemySagaStorage) to a timestamp greater than the others, then call
get_sagas_for_recovery and assert id2 is last; use existing methods like
storage.create_saga, storage.update_status and the underlying SQLAlchemy
session/updated_at column to perform the direct timestamp update.

In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py`:
- Around line 242-260: The test
test_get_sagas_for_recovery_ordered_by_updated_at directly manipulates
SqlAlchemySagaStorage.session_factory and SagaExecutionModel.updated_at which
couples the test to the storage internals; refactor by extracting a shared test
helper (e.g., a utility function used by tests that accepts storage and saga_id
and updates the underlying updated_at) or add a public helper method on
SqlAlchemySagaStorage (e.g., update_saga_timestamp) to perform the
session/execute/commit logic, then update this test to call that helper instead
of using storage.session_factory directly so tests no longer depend on
storage.session_factory internals while still setting updated_at for
get_sagas_for_recovery ordering checks.
- Around line 188-197: The fixture _clean_saga_tables currently only deletes
SagaLogModel and SagaExecutionModel before yielding, so leftover rows from the
last test persist; modify the fixture to perform the same deletions again after
the yield (using saga_session_factory_postgres to open a session and execute
delete(SagaLogModel) and delete(SagaExecutionModel) followed by commit) so
cleanup runs both before and after each test.

@vadikko2 vadikko2 merged commit 37e723a into master Feb 20, 2026
11 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Feb 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant