Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a scoped-run abstraction for saga storage (SagaStorageRun and ISagaStorage.create_run), implements run-backed Memory and SQLAlchemy runs with commit/rollback, alters saga execution and compensation to use run-scoped flows with legacy fallback, extends tests/benchmarks for run vs legacy modes, and updates docs, examples, and CI for Postgres/MySQL. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Saga as Saga
participant Storage as ISagaStorage/\nSagaStorageRun
participant StateMgr as StateManager
participant Executor as StepExecutor
participant Comp as Compensator
Note over Saga,Storage: Run-backed path
Client->>Saga: start saga
Saga->>Storage: create_run()
Storage-->>Saga: SagaStorageRun
Saga->>Storage: create_saga(saga_id, name, context)
Saga->>Storage: update_status(RUNNING)
Saga->>Storage: commit()
loop For each step
Saga->>StateMgr: load_completed_step_names()
Saga->>Executor: execute_step(step)
Executor-->>Saga: step result
Saga->>Storage: log_step(step, status)
Saga->>Storage: update_context(context)
Saga->>Storage: commit()
end
alt step fails
Saga->>Storage: update_status(COMPENSATING)
Saga->>Storage: commit()
loop Reverse completed steps
Saga->>Comp: compensate_steps([...])
Comp->>Storage: log_step(step, "compensate", COMPLETED)
Comp-->>Saga: compensation result
end
Saga->>Storage: update_status(FAILED)
Saga->>Storage: commit()
else all succeed
Saga->>Storage: update_status(COMPLETED)
Saga->>Storage: commit()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #61 +/- ##
==========================================
+ Coverage 87.26% 87.67% +0.40%
==========================================
Files 70 70
Lines 2481 2636 +155
==========================================
+ Hits 2165 2311 +146
- Misses 316 325 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
README.md (1)
86-97:⚠️ Potential issue | 🟡 MinorFix the
eventsvs_eventsmismatch in examples.Both Command and Query handler snippets set
self.eventsbut the property returnsself._events. This will confuse users and is inconsistent with the sample. Use one attribute name consistently.✅ Suggested doc fix
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]): def __init__(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api - self.events: list[Event] = [] + self._events: list[Event] = [] `@property` def events(self) -> typing.List[events.Event]: return self._eventsclass ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]): def __init__(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api - self.events: list[Event] = [] + self._events: list[Event] = [] `@property` def events(self) -> typing.List[events.Event]: return self._eventsAlso applies to: 114-126
🤖 Fix all issues with AI agents
In `@README.md`:
- Around line 325-328: The snippet in init_events uses two inconsistent module
aliases (events_models vs event_models) which will confuse readers and break the
example; choose one alias and use it consistently for the NotificationEvent
generic types (e.g., change event_models to events_models or vice versa) so both
mapper.bind lines reference the same symbol set (affecting
NotificationEvent[events_models.ECSTMeetingRoomClosed] and
NotificationEvent[events_models.NotificationMeetingRoomClosed]), and update any
corresponding import alias shown elsewhere in the snippet to match (keep
MeetingRoomClosedNotificationHandler and UpdateMeetingRoomReadModelHandler names
unchanged).
In `@src/cqrs/saga/compensation.py`:
- Around line 104-105: The post-compensation callback
self._on_after_compensate_step should be executed outside the step's broad
try/except so its failures don't get swallowed or cause duplicate
COMPLETED/FAILED logs; move the await self._on_after_compensate_step() call out
of the step exception handling in compensate_step (or the surrounding method),
let exceptions from _on_after_compensate_step (e.g., run.commit) propagate or
handle them explicitly (log once and abort) before marking the step
COMPLETED/FAILED, ensuring the callback is invoked only after a successful step
and its errors are treated separately.
🧹 Nitpick comments (3)
examples/saga_recovery.py (1)
293-295: Consider using a single f-string.Adjacent f-string concatenation
f"..." f"..."works but a single f-string would be cleaner:f" ✓ Created shipment {shipment_id} for order {order_id} (tracking: {tracking_number})"This same pattern appears on lines 608, 679, 788, and 853.
src/cqrs/saga/saga.py (1)
202-208: Avoid reaching into_compensatorprivate fields.
Using_compensator._retry_*couples the new run path to internal state. Prefer storing the retry config onSagaTransactionand reusing it here.♻️ Suggested refactor (store retry config on the transaction)
@@ self._storage = storage self._completed_steps: list[SagaStepHandler[ContextT, typing.Any]] = [] self._error: BaseException | None = None self._compensated: bool = False + self._comp_retry_count = compensation_retry_count + self._comp_retry_delay = compensation_retry_delay + self._comp_retry_backoff = compensation_retry_backoff @@ self._compensator: SagaCompensator[ContextT] = SagaCompensator[ContextT]( self._saga_id, context, storage, - compensation_retry_count, - compensation_retry_delay, - compensation_retry_backoff, + self._comp_retry_count, + self._comp_retry_delay, + self._comp_retry_backoff, ) @@ compensator = SagaCompensator( self._saga_id, self._context, run, - self._compensator._retry_count, - self._compensator._retry_delay, - self._compensator._retry_backoff, + self._comp_retry_count, + self._comp_retry_delay, + self._comp_retry_backoff, on_after_compensate_step=run.commit, )tests/benchmarks/dataclasses/test_benchmark_saga_memory.py (1)
225-249: Unusedmemory_storagefixture parameter.The
memory_storagefixture is injected but never used—a freshMemorySagaStorage()is created inside the loop (line 235). Consider removing the unused parameter for clarity.Proposed fix
`@pytest.mark.benchmark` def test_benchmark_saga_memory_run_ten_transactions( benchmark, saga_with_memory_storage: Saga[OrderContext], saga_container: SagaContainer, - memory_storage: MemorySagaStorage, ):
|
Note Docstrings generation - SUCCESS |
Docstrings generation was requested by @vadikko2. * #61 (comment) The following files were modified: * `examples/saga.py` * `examples/saga_fallback.py` * `examples/saga_recovery.py` * `examples/saga_recovery_scheduler.py` * `examples/saga_sqlalchemy_storage.py` * `src/cqrs/saga/compensation.py` * `src/cqrs/saga/execution.py` * `src/cqrs/saga/saga.py` * `src/cqrs/saga/storage/memory.py` * `src/cqrs/saga/storage/protocol.py` * `src/cqrs/saga/storage/sqlalchemy.py` * `tests/benchmarks/dataclasses/test_benchmark_saga_memory.py` * `tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py` * `tests/benchmarks/default/test_benchmark_saga_memory.py` * `tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` * `tests/unit/test_saga/test_saga_storage_run.py`
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
examples/saga_sqlalchemy_storage.py (1)
42-44:⚠️ Potential issue | 🟡 MinorComment contradicts the actual default URL.
Line 43 says "Using SQLite for this example" but the default
DB_URLon line 44 is a MySQL connection string (mysql+asyncmy://...). The comment appears to be a leftover from an earlier version.Suggested fix
# Database Configuration -# Using SQLite for this example, but can be swapped for PostgreSQL/MySQL +# Using MySQL for this example, but can be swapped for PostgreSQL/SQLite DB_URL = os.getenv("DATABASE_URL", "mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/saga_sqlalchemy_storage.py` around lines 42 - 44, The comment above DB_URL is incorrect—update the text or the default to match each other: either change the comment to indicate MySQL (or asyncmy) as the default connection, or set DB_URL's default to a SQLite URI (e.g., "sqlite+aiosqlite:///./test.db"); modify the comment near the DB_URL declaration (symbol: DB_URL) to accurately describe the default or change the DB_URL value to the intended SQLite URL so the comment and default are consistent.
🧹 Nitpick comments (8)
examples/saga_sqlalchemy_storage.py (1)
142-149: Docstring placed after inline comments — may confuse documentation tools.The docstring on lines 145–149 is placed after the
#comments on lines 143–144. While Python still recognizes it as the function docstring (comments aren't statements), many documentation generators (Sphinx, IDE tooltips) expect the docstring to immediately follow thedefline.Move docstring before comments
async def main() -> None: + """ + Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage. + + Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine. + """ # 1. Create SQLAlchemy Engine with Connection Pool # SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite) - """ - Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage. - - Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine. - """ engine = create_async_engine(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/saga_sqlalchemy_storage.py` around lines 142 - 149, The function-level docstring for async def main() is placed after inline comments, which can confuse documentation tools; move the triple-quoted docstring so it immediately follows the def main() line (i.e., place the """Run a demonstration...""" string directly after the function signature), and then keep the existing inline comments (# 1. Create SQLAlchemy Engine...) after the docstring; ensure the string is a proper docstring (triple quotes) and retains the same content.src/cqrs/saga/storage/sqlalchemy.py (2)
191-218: Duplicate.values(...)in both branches ofupdate_context.The
if current_version is not Noneandelsebranches set identical.values(context=context, version=SagaExecutionModel.version + 1). The only difference is the extra.where(...)clause. This can be simplified.♻️ Simplify by applying the common values once
stmt = sqlalchemy.update(SagaExecutionModel).where( SagaExecutionModel.id == saga_id, ) if current_version is not None: stmt = stmt.where(SagaExecutionModel.version == current_version) - stmt = stmt.values( - context=context, - version=SagaExecutionModel.version + 1, - ) - else: - stmt = stmt.values( - context=context, - version=SagaExecutionModel.version + 1, - ) + stmt = stmt.values( + context=context, + version=SagaExecutionModel.version + 1, + )Note: The same duplication exists in
SqlAlchemySagaStorage.update_context(lines 439–450).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/storage/sqlalchemy.py` around lines 191 - 218, The update_context implementation on SqlAlchemySagaStorage duplicates the .values(context=context, version=SagaExecutionModel.version + 1) call in both branches; change the logic so the conditional only adds the extra .where(SagaExecutionModel.version == current_version) when current_version is not None and apply .values(...) once after that branch, keeping the initial stmt = sqlalchemy.update(SagaExecutionModel).where(SagaExecutionModel.id == saga_id) and then executing stmt via self._session.execute as before; also apply the same refactor to the other occurrence in SqlAlchemySagaStorage.update_context to remove the duplicated .values call.
379-389:create_rundoes not commit on normal exit — silent data loss if caller forgetscommit().By design, the protocol says the caller must call
commit(). However, the context manager silently discards uncommitted work on clean exit (no exception, no rollback). For a database-backed storage, this can be surprising.Consider logging a warning if the session has uncommitted changes on exit, or documenting this prominently.
tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py (1)
25-41:SqlAlchemySagaStorageLegacyis duplicated across benchmark modules.This identical class appears in both
tests/benchmarks/default/test_benchmark_saga_sqlalchemy.pyandtests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py(and a similar pattern in the memory benchmark). Consider extracting it to a sharedconftest.pyor test utility module.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 25 - 41, Extract the duplicated SqlAlchemySagaStorageLegacy class into a single shared test helper (e.g., a test utilities module or conftest) and import it from both benchmark test modules instead of redefining it; specifically, move the class definition for SqlAlchemySagaStorageLegacy into the shared test helper, update the benchmark test modules to import SqlAlchemySagaStorageLegacy, and remove the duplicate class definitions (also apply the same extraction for the analogous memory-benchmark duplicate).src/cqrs/saga/saga.py (2)
191-223: Run-scoped components duplicate__init__wiring — consider extracting a factory.When
run is not None, lines 192–217 re-createSagaStateManager,SagaRecoveryManager,SagaStepExecutor,FallbackStepExecutor, andSagaCompensatorwith the run as the storage argument. Theelsebranch (lines 218–223) just aliases the__init__-created instances. This duplication will grow if more components are added.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/saga.py` around lines 191 - 223, The block that re-creates SagaStateManager, SagaRecoveryManager, SagaStepExecutor, FallbackStepExecutor, and SagaCompensator when run is not None duplicates __init__ wiring; extract that creation into a small factory/helper method (e.g., _build_run_scoped_components(run)) that returns (state_manager, recovery_manager, step_executor, fallback_executor, compensator), ensure the SagaCompensator is constructed with the same retry args and on_after_compensate_step=run.commit, and replace the duplicated block with a single call to that factory while preserving the else branch aliasing to self._state_manager, self._recovery_manager, self._step_executor, self._fallback_executor, and self._compensator.
162-173: Fallback fromcreate_run()viaNotImplementedError— verify this is the intended contract.The code catches
NotImplementedErrorsynchronously fromself._storage.create_run(). This works because the baseISagaStorage.create_run()raisesNotImplementedErrordirectly (not inside an async context). If a storage implementation were to wrapcreate_run()as an async method or defer the error to__aenter__, this pattern would break.The current implementations (Memory, SQLAlchemy) are correct, but documenting this synchronous-raise contract in the protocol would help future implementers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/saga.py` around lines 162 - 173, The current fallback assumes self._storage.create_run() raises NotImplementedError synchronously; update saga.py to also handle the case where create_run() returns an async context manager that only raises on __aenter__. Change the block around create_run()/async with so you first call run_cm = self._storage.create_run(), then if run_cm is None proceed as before, else wrap the async with run_cm as run in a try/except that catches NotImplementedError (and falls back to calling self._execute(None)), so both synchronous raises and raises from entering the context are handled; alternatively, add a clear protocol docstring on ISagaStorage.create_run to require synchronous NotImplementedError, but prefer the try/except-around-async-with approach to be robust.tests/benchmarks/dataclasses/test_benchmark_saga_memory.py (2)
259-289: Unusedmemory_storagefixture parameter inflates benchmark overhead.
test_benchmark_saga_memory_run_ten_transactionsrequestsmemory_storage: MemorySagaStoragebut never reads it —run()creates a freshMemorySagaStorage()per iteration. The legacy counterparttest_benchmark_saga_memory_legacy_ten_transactions(lines 354-383) correctly omits its analogous fixture for exactly this reason. Align both by removing the unused parameter.♻️ Proposed fix
`@pytest.mark.benchmark` def test_benchmark_saga_memory_run_ten_transactions( benchmark, saga_with_memory_storage: Saga[OrderContext], saga_container: SagaContainer, - memory_storage: MemorySagaStorage, ): """Benchmark 10 saga transactions in sequence, scoped run (memory storage)."""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py` around lines 259 - 289, The test function test_benchmark_saga_memory_run_ten_transactions currently declares an unused fixture parameter memory_storage: MemorySagaStorage which inflates benchmark overhead; remove the memory_storage parameter from the test signature so the function becomes def test_benchmark_saga_memory_run_ten_transactions(benchmark, saga_with_memory_storage: Saga[OrderContext], saga_container: SagaContainer): and keep the internal creation of MemorySagaStorage() inside run() as-is; ensure no other references to memory_storage remain in this function or its docstring.
156-168:MemorySagaStorageLegacyis duplicated across benchmark modules.
tests/benchmarks/default/test_benchmark_saga_memory.py(lines 153–172) defines an identicalMemorySagaStorageLegacyclass. Extract it totests/benchmarks/conftest.pyso both test files import from one place instead of duplicating the class.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py` around lines 156 - 168, Extract the duplicated MemorySagaStorageLegacy class into a shared conftest.py and update both benchmark test modules to import it from that single location: create MemorySagaStorageLegacy in conftest.py (preserving the docstring and the create_run raising NotImplementedError message) and remove the duplicate class definitions from the two test files, then add an import or rely on pytest autoloading so tests reference the single MemorySagaStorageLegacy symbol instead of having separate copies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/saga/saga.py`:
- Around line 209-217: The code currently reaches into SagaCompensator's private
fields (self._compensator._retry_count, _retry_delay, _retry_backoff) when
creating a run-scoped compensator in _execute; instead, capture the retry
parameters passed into SagaTransaction.__init__ by assigning them to instance
attributes (e.g., self._retry_count, self._retry_delay, self._retry_backoff)
during construction and then use those attributes when creating SagaCompensator
instances (both the original and the run-scoped one in _execute) so you no
longer reference self._compensator's private internals.
- Around line 347-359: The FAILED step log written via state_manager.log_step
inside the except block can be lost if compensator.compensate_steps raises
because the write is still staged in the run session; modify the except block so
that after calling state_manager.log_step(...) you ensure the write is persisted
by calling run.commit() (if run is not None) before invoking
compensator.compensate_steps(self._completed_steps), then proceed with setting
self._error/self._compensated and running compensation; keep the existing
rollback/raise behavior for subsequent failures.
In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`:
- Around line 182-202: The fixture saga_with_memory_storage declares unused
pytest dependencies saga_container (SagaContainer) and memory_storage
(MemorySagaStorage); remove these parameters from the fixture signature so
pytest won't instantiate them unnecessarily, and update the docstring to reflect
no external fixture dependencies; keep the inner OrderSaga class
(Saga[OrderContext]) and its steps list [ReserveInventoryStep,
ProcessPaymentStep, ShipOrderStep] unchanged and simply return OrderSaga().
---
Outside diff comments:
In `@examples/saga_sqlalchemy_storage.py`:
- Around line 42-44: The comment above DB_URL is incorrect—update the text or
the default to match each other: either change the comment to indicate MySQL (or
asyncmy) as the default connection, or set DB_URL's default to a SQLite URI
(e.g., "sqlite+aiosqlite:///./test.db"); modify the comment near the DB_URL
declaration (symbol: DB_URL) to accurately describe the default or change the
DB_URL value to the intended SQLite URL so the comment and default are
consistent.
---
Duplicate comments:
In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`:
- Around line 25-37: The SqlAlchemySagaStorageLegacy class is duplicated; remove
the redundant definition and ensure both benchmark variants reference a single
shared implementation (SqlAlchemySagaStorageLegacy) - e.g., keep the unique
class (SqlAlchemySagaStorageLegacy) and delete the duplicate, or extract it to a
common location used by both the `default` and other benchmark variants so only
one NotImplementedError-raising create_run implementation exists.
---
Nitpick comments:
In `@examples/saga_sqlalchemy_storage.py`:
- Around line 142-149: The function-level docstring for async def main() is
placed after inline comments, which can confuse documentation tools; move the
triple-quoted docstring so it immediately follows the def main() line (i.e.,
place the """Run a demonstration...""" string directly after the function
signature), and then keep the existing inline comments (# 1. Create SQLAlchemy
Engine...) after the docstring; ensure the string is a proper docstring (triple
quotes) and retains the same content.
In `@src/cqrs/saga/saga.py`:
- Around line 191-223: The block that re-creates SagaStateManager,
SagaRecoveryManager, SagaStepExecutor, FallbackStepExecutor, and SagaCompensator
when run is not None duplicates __init__ wiring; extract that creation into a
small factory/helper method (e.g., _build_run_scoped_components(run)) that
returns (state_manager, recovery_manager, step_executor, fallback_executor,
compensator), ensure the SagaCompensator is constructed with the same retry args
and on_after_compensate_step=run.commit, and replace the duplicated block with a
single call to that factory while preserving the else branch aliasing to
self._state_manager, self._recovery_manager, self._step_executor,
self._fallback_executor, and self._compensator.
- Around line 162-173: The current fallback assumes self._storage.create_run()
raises NotImplementedError synchronously; update saga.py to also handle the case
where create_run() returns an async context manager that only raises on
__aenter__. Change the block around create_run()/async with so you first call
run_cm = self._storage.create_run(), then if run_cm is None proceed as before,
else wrap the async with run_cm as run in a try/except that catches
NotImplementedError (and falls back to calling self._execute(None)), so both
synchronous raises and raises from entering the context are handled;
alternatively, add a clear protocol docstring on ISagaStorage.create_run to
require synchronous NotImplementedError, but prefer the
try/except-around-async-with approach to be robust.
In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 191-218: The update_context implementation on
SqlAlchemySagaStorage duplicates the .values(context=context,
version=SagaExecutionModel.version + 1) call in both branches; change the logic
so the conditional only adds the extra .where(SagaExecutionModel.version ==
current_version) when current_version is not None and apply .values(...) once
after that branch, keeping the initial stmt =
sqlalchemy.update(SagaExecutionModel).where(SagaExecutionModel.id == saga_id)
and then executing stmt via self._session.execute as before; also apply the same
refactor to the other occurrence in SqlAlchemySagaStorage.update_context to
remove the duplicated .values call.
In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`:
- Around line 259-289: The test function
test_benchmark_saga_memory_run_ten_transactions currently declares an unused
fixture parameter memory_storage: MemorySagaStorage which inflates benchmark
overhead; remove the memory_storage parameter from the test signature so the
function becomes def test_benchmark_saga_memory_run_ten_transactions(benchmark,
saga_with_memory_storage: Saga[OrderContext], saga_container: SagaContainer):
and keep the internal creation of MemorySagaStorage() inside run() as-is; ensure
no other references to memory_storage remain in this function or its docstring.
- Around line 156-168: Extract the duplicated MemorySagaStorageLegacy class into
a shared conftest.py and update both benchmark test modules to import it from
that single location: create MemorySagaStorageLegacy in conftest.py (preserving
the docstring and the create_run raising NotImplementedError message) and remove
the duplicate class definitions from the two test files, then add an import or
rely on pytest autoloading so tests reference the single MemorySagaStorageLegacy
symbol instead of having separate copies.
In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`:
- Around line 25-41: Extract the duplicated SqlAlchemySagaStorageLegacy class
into a single shared test helper (e.g., a test utilities module or conftest) and
import it from both benchmark test modules instead of redefining it;
specifically, move the class definition for SqlAlchemySagaStorageLegacy into the
shared test helper, update the benchmark test modules to import
SqlAlchemySagaStorageLegacy, and remove the duplicate class definitions (also
apply the same extraction for the analogous memory-benchmark duplicate).
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (9)
docker-compose-dev.yml (1)
19-19: Pin thepostgresimage to a specific version.
postgres:latestresolves to whatever the current latest release is at pull time. The adjacent MySQL service usesmysql:8.3.0. An unexpected major-version bump (e.g., 16 → 17) can silently break schema-init behavior or driver compatibility in CI. Docker generally recommends using a specific version of Postgres, since:latestautomatically changes with each new release and it's hard to know whether newer versions will introduce breaking changes or vulnerabilities.🔧 Proposed fix
- image: postgres:latest + image: postgres:16🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose-dev.yml` at line 19, The docker-compose Postgres service currently uses the floating tag "postgres:latest"; change the image field in that service to a fixed, explicit Postgres version (e.g., "postgres:15.4" or whichever minor version your CI/schema was validated against) instead of :latest to avoid unexpected major upgrades; update the image line in the postgres service block to the chosen pinned tag and ensure any related CI/dockerfiles that assume a Postgres major version are kept consistent.tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py (1)
117-193: Significant code duplication between run-path and legacy-path benchmarks.The four benchmarks split cleanly into run vs legacy, but the session factory setup, context creation, and
run_transactionbody are copy-pasted verbatim. A thin parametrised or factory approach keeps benchmark names explicit while eliminating the duplication:♻️ Sketch of a parametrised helper
import pytest STORAGE_VARIANTS = [ pytest.param(SqlAlchemySagaStorage, id="run"), pytest.param(SqlAlchemySagaStorageLegacy, id="legacy"), ] def _make_storage(engine, storage_cls): return storage_cls(async_sessionmaker(engine, expire_on_commit=False, autocommit=False, autoflush=False)) `@pytest.mark.benchmark` `@pytest.mark.parametrize`("storage_cls", STORAGE_VARIANTS) def test_benchmark_saga_sqlalchemy_full_transaction( benchmark, saga_sqlalchemy, saga_container, saga_benchmark_loop_and_engine, storage_cls ): loop, engine = saga_benchmark_loop_and_engine storage = _make_storage(engine, storage_cls) context = OrderContext(order_id="ord_1", user_id="user_1", amount=100.0) async def run_transaction() -> None: async with saga_sqlalchemy.transaction( context=context, container=saga_container, storage=storage, ) as transaction: async for _ in transaction: pass benchmark(lambda: loop.run_until_complete(run_transaction()))CodSpeed will surface the two variants as
test_benchmark_saga_sqlalchemy_full_transaction[run]and...[legacy], preserving the naming distinction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 117 - 193, The two legacy benchmarks duplicate session_factory, storage creation, context creation, and the run_transaction body; refactor by adding a small factory/helper (e.g., _make_storage(engine, storage_cls)) to build the storage using async_sessionmaker and parametrize the tests over storage classes (SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy) so you keep test names explicit via pytest.param ids; update test_benchmark_saga_sqlalchemy_legacy_full_transaction and test_benchmark_saga_sqlalchemy_legacy_single_step to accept a storage_cls parameter, call _make_storage(loop_engine[1], storage_cls) and reuse a single run_transaction implementation (or inline identical body) instead of duplicating setup and iteration logic.docker-compose-test.yml (1)
19-19: Pin thepostgresimage version for reproducible CI.Same issue as
docker-compose-dev.yml:postgres:latestcan silently change between pipeline runs, whilemysql_testsis pinned at8.3.0.🔧 Proposed fix
- image: postgres:latest + image: postgres:16🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose-test.yml` at line 19, The docker-compose test service currently uses the floating image tag "postgres:latest" which can change between CI runs; replace that image value with a pinned, explicit Postgres tag (e.g., "postgres:15.4" or a specific alpine/patch tag your CI expects) so the test service image is reproducible—update the image field for the Postgres service (the line currently containing "image: postgres:latest") to the chosen fixed tag and ensure any CI/health-checks or related configs reference the same pinned version.tests/integration/test_saga_mediator_sqlalchemy_postgres.py (1)
36-74:_TestContaineris duplicated verbatim between MySQL and Postgres test modules.This container class, along with the
containerandsaga_mediatorfixtures, are identical in bothtest_saga_mediator_sqlalchemy_mysql.pyandtest_saga_mediator_sqlalchemy_postgres.py. Consider extracting the shared container and fixture logic into a common conftest or helper module to avoid maintaining two copies.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py` around lines 36 - 74, Duplicate _TestContainer, its fixtures (container, saga_mediator), and related helpers should be extracted into a shared test utility to avoid code duplication; create a common test module (e.g., tests/conftest or tests/helpers) that defines the _TestContainer class (including methods resolve, attach_external_container, and property external_container) and the container and saga_mediator fixtures, then update both test_saga_mediator_sqlalchemy_mysql.py and test_saga_mediator_sqlalchemy_postgres.py to import and use those shared symbols instead of defining them locally.examples/saga_recovery_scheduler.py (2)
267-267: Inconsistent logging style: f-string vs%-style.Line 267 switches to an f-string while the rest of the file (e.g., lines 205, 215, 232-237, 275-278) uses
%-style placeholders forloggercalls. Pick one style for consistency within the file.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/saga_recovery_scheduler.py` at line 267, The logger call using an f-string is inconsistent with the rest of the file; update the logger.info call that references shipment_id, order_id, and tracking_number to use the same %‑style placeholder formatting used elsewhere (i.e., call logger.info with a format string containing %s placeholders and pass shipment_id, order_id, tracking_number as separate arguments) so it matches lines like the other logger.* usages.
520-527:logger.exceptionalready includes the traceback —traceback.format_exc()duplicates it.
logger.exception(...)appends the current exception's traceback automatically. Embeddingtraceback.format_exc()in the f-string causes the full stack trace to appear twice in each log record.Proposed fix
- logger.info(f"Saga {saga_id} recovery completed compensation: {traceback.format_exc()}") + logger.info(f"Saga {saga_id} recovery completed compensation.") processed += 1 else: - logger.exception(f"Saga {saga_id} recovery failed: {traceback.format_exc()}") + logger.exception(f"Saga {saga_id} recovery failed.") processed += 1 - except Exception: - logger.exception(f"Saga {saga_id} recovery failed: {traceback.format_exc()}") + except Exception: + logger.exception(f"Saga {saga_id} recovery failed.")The same issue appears at line 565 in
recovery_loop:- logger.exception(f"Recovery iteration failed: {traceback.format_exc()}") + logger.exception("Recovery iteration failed.")If these are all addressed, the
import tracebackon line 82 can be removed as well.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/saga_recovery_scheduler.py` around lines 520 - 527, The log calls in recovery_loop that use logger.exception or logger.info currently embed traceback.format_exc(), causing duplicate stack traces; update the calls in the recovery_loop (references: saga_id variable and the recovery_loop function) to remove traceback.format_exc() — use logger.exception(f"Saga {saga_id} recovery failed") for exception cases and logger.info(f"Saga {saga_id} recovery completed compensation") for the successful case; after fixing all occurrences (including the one around line 565), remove the now-unused import traceback.src/cqrs/saga/storage/sqlalchemy.py (1)
215-237:update_statussilently succeeds when the saga doesn't exist.Unlike
update_context(which checksrowcount == 0whencurrent_versionis set),update_statusin the run does not verify that any row was actually updated. Ifsaga_iddoesn't exist, the UPDATE silently affects zero rows. The parent class has the same gap, so this is consistent — but within a scoped run where the saga was just created in the same session, this is unlikely to be hit in practice.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/storage/sqlalchemy.py` around lines 215 - 237, The update_status method on the SQLAlchemy saga storage silently succeeds when no row is updated; modify update_status (in src/cqrs/saga/storage/sqlalchemy.py) to inspect the result of await self._session.execute(...) (check result.rowcount) and raise an appropriate error when rowcount == 0 (same behavior as update_context when current_version is set). Reference the SagaExecutionModel and the update_status method when adding the check so the call will surface a missing-saga error instead of silently doing nothing.tests/integration/test_saga_storage_sqlalchemy_postgres.py (1)
242-256:asyncio.sleep(1.0)for ordering test may be fragile on slow CI.The test relies on a 1-second wall-clock gap so that
updated_atdiffers between sagas. If the DB server's clock resolution or CI latency causes issues, this could become flaky. Consider updating theupdated_atcolumn directly (similar to howcreate_interrupted_sagain the example manipulatesupdated_at) or using a longer sleep as a safety margin.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py` around lines 242 - 256, The test test_get_sagas_for_recovery_ordered_by_updated_at uses asyncio.sleep(1.0) to force an updated_at ordering which is flaky; instead, modify the test to avoid wall-clock sleeps by directly adjusting the updated_at timestamp for the target saga (like create_interrupted_saga does) after calling storage.create_saga/storage.update_status and before calling storage.get_sagas_for_recovery — e.g., use the storage's DB session/engine or an existing helper to set the updated_at of id2 to a later timestamp so the ordering is deterministic when asserting ids[-1] == id2.src/cqrs/saga/compensation.py (1)
132-145: Both branches unconditionally mark saga asFAILED— intentional but the comment on Line 144 is misleading.Line 144 says "If all compensations succeeded (or were skipped), mark as failed" — which is correct behavior (compensation means the forward saga failed), but the comment reads as if
FAILEDis surprising. A small rewording would improve clarity.Suggested comment tweak
else: - # If all compensations succeeded (or were skipped), mark as failed + # All compensations succeeded (or were skipped); mark saga as FAILED + # because compensation itself means the forward execution failed. await self._storage.update_status(self._saga_id, SagaStatus.FAILED)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/compensation.py` around lines 132 - 145, The comment on the else branch is misleading: although both branches set the saga status to SagaStatus.FAILED, the else branch should clearly state that marking FAILED is intentional because all compensations completed (or were skipped) after the forward saga failed; update the comment near the else that precedes await self._storage.update_status(self._saga_id, SagaStatus.FAILED) to explicitly say something like "All compensations completed or were skipped — mark saga as FAILED because the original forward transaction failed", keeping the logic (compensation_errors, SagaStatus.FAILED, _storage.update_status, _saga_id) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/tests.yml:
- Around line 136-139: Move the hardcoded test DSNs out of the Actions YAML by
replacing the literal values for the env keys DATABASE_DSN, DATABASE_DSN_MYSQL,
and DATABASE_DSN_POSTGRESQL with references to GitHub Actions secrets (e.g. use
the secrets store and reference them as secrets.DATABASE_DSN,
secrets.DATABASE_DSN_MYSQL, secrets.DATABASE_DSN_POSTGRESQL in the workflow),
then add those three secret entries in the repository's GitHub Actions Secrets
with the corresponding DSN values and update any other places that duplicate the
credentials (tests/pytest-config.ini and docker-compose-test.yml) to either read
from the same secrets or from a single source to avoid duplication.
In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 363-385: The except block in create_run's inner _run context
manager currently uses "except Exception:" which does not catch
asyncio.CancelledError (a BaseException subclass), so update the handler in _run
to catch BaseException instead of Exception; specifically, change the except to
"except BaseException:" so that _SqlAlchemySagaStorageRun.rollback() is always
awaited (await run.rollback()) before re-raising the cancellation or other
BaseException, preserving the session closing behavior.
In `@tests/benchmarks/dataclasses/test_benchmark_saga_memory.py`:
- Around line 1-5: Two tests violate the module docstring naming convention for
scoped run benchmarks: rename test_benchmark_saga_memory_full_transaction to
test_benchmark_saga_memory_run_full_transaction and
test_benchmark_saga_memory_single_step to
test_benchmark_saga_memory_run_single_step; update all references (test function
definitions, any pytest markers/decorators, and uses in fixtures or
parametrizations) so callers and markers match the new names and maintain
consistency with test_benchmark_saga_memory_run_ten_transactions.
In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`:
- Around line 1-5: Rename the two scoped-run test functions to include the
`_run_` infix so they follow the module naming convention: change
test_benchmark_saga_sqlalchemy_full_transaction to
test_benchmark_saga_sqlalchemy_run_full_transaction and
test_benchmark_saga_sqlalchemy_single_step to
test_benchmark_saga_sqlalchemy_run_single_step; update any references/usages
(pytest markers, imports, or fixtures) that call these functions and adjust the
names in the module docstring/examples if present to keep consistency. Ensure
the function definitions for the affected tests are the only changed symbols (no
behavior changes).
In `@tests/benchmarks/default/test_benchmark_saga_memory.py`:
- Around line 1-5: Rename the two benchmark test functions that use the
scoped-run path to include the "_run_" infix so names match the module
docstring: change test_benchmark_saga_memory_full_transaction to
test_benchmark_saga_memory_run_full_transaction and
test_benchmark_saga_memory_single_step to
test_benchmark_saga_memory_run_single_step; update any references to these
function names (pytest markers, imports, or string lookups) so test discovery
and CI continue to work, and keep
test_benchmark_saga_memory_run_ten_transactions unchanged.
In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py`:
- Line 26: The docstring for SqlAlchemySagaStorage in
tests/integration/test_saga_storage_sqlalchemy_mysql.py is written in Russian;
update it to English for consistency (replace the Russian string
"""SqlAlchemySagaStorage для MySQL (фикстура init_saga_orm_mysql поднимает
схему).""" with an English equivalent such as """SqlAlchemySagaStorage for MySQL
(the init_saga_orm_mysql fixture sets up the schema).""" so readers and
contributors see a clear English description referencing the init_saga_orm_mysql
fixture).
---
Duplicate comments:
In `@README.md`:
- Around line 325-328: The code previously had a namespace typo (events_models
vs event_models); update any remaining references to the old identifier so both
mapper.bind calls in init_events use event_models consistently, search for other
uses of events_models and replace them with event_models, and ensure imports
(event_models, events, event_handlers) remain correct for functions/classes like
init_events, events.EventMap, mapper.bind, and
event_handlers.MeetingRoomClosedNotificationHandler; after changes run tests or
lint to confirm no unresolved names remain.
In `@src/cqrs/saga/saga.py`:
- Around line 382-394: The FAILED step log is only staged by
state_manager.log_step and can be lost if compensator.compensate_steps raises
before the run's first commit; after calling await state_manager.log_step(...)
in the except block, immediately persist the staged log by invoking await
run.commit() (guarding with if run is not None) before starting
compensator.compensate_steps(self._completed_steps), preserving the current
error handling and rollback semantics if commit itself fails.
In `@tests/benchmarks/default/test_benchmark_saga_memory.py`:
- Around line 164-179: The fixture saga_with_memory_storage declares unused
parameters saga_container and memory_storage causing unnecessary pytest setup;
remove these parameters from the fixture signature so it becomes def
saga_with_memory_storage() -> Saga[OrderContext]: and keep returning OrderSaga()
(which defines steps = [ReserveInventoryStep, ProcessPaymentStep,
ShipOrderStep]); alternatively, if those resources are required, reference
saga_container or memory_storage inside the fixture (e.g., pass into OrderSaga
or use to configure it) — choose one approach and update the
saga_with_memory_storage fixture accordingly.
---
Nitpick comments:
In `@docker-compose-dev.yml`:
- Line 19: The docker-compose Postgres service currently uses the floating tag
"postgres:latest"; change the image field in that service to a fixed, explicit
Postgres version (e.g., "postgres:15.4" or whichever minor version your
CI/schema was validated against) instead of :latest to avoid unexpected major
upgrades; update the image line in the postgres service block to the chosen
pinned tag and ensure any related CI/dockerfiles that assume a Postgres major
version are kept consistent.
In `@docker-compose-test.yml`:
- Line 19: The docker-compose test service currently uses the floating image tag
"postgres:latest" which can change between CI runs; replace that image value
with a pinned, explicit Postgres tag (e.g., "postgres:15.4" or a specific
alpine/patch tag your CI expects) so the test service image is
reproducible—update the image field for the Postgres service (the line currently
containing "image: postgres:latest") to the chosen fixed tag and ensure any
CI/health-checks or related configs reference the same pinned version.
In `@examples/saga_recovery_scheduler.py`:
- Line 267: The logger call using an f-string is inconsistent with the rest of
the file; update the logger.info call that references shipment_id, order_id, and
tracking_number to use the same %‑style placeholder formatting used elsewhere
(i.e., call logger.info with a format string containing %s placeholders and pass
shipment_id, order_id, tracking_number as separate arguments) so it matches
lines like the other logger.* usages.
- Around line 520-527: The log calls in recovery_loop that use logger.exception
or logger.info currently embed traceback.format_exc(), causing duplicate stack
traces; update the calls in the recovery_loop (references: saga_id variable and
the recovery_loop function) to remove traceback.format_exc() — use
logger.exception(f"Saga {saga_id} recovery failed") for exception cases and
logger.info(f"Saga {saga_id} recovery completed compensation") for the
successful case; after fixing all occurrences (including the one around line
565), remove the now-unused import traceback.
In `@src/cqrs/saga/compensation.py`:
- Around line 132-145: The comment on the else branch is misleading: although
both branches set the saga status to SagaStatus.FAILED, the else branch should
clearly state that marking FAILED is intentional because all compensations
completed (or were skipped) after the forward saga failed; update the comment
near the else that precedes await self._storage.update_status(self._saga_id,
SagaStatus.FAILED) to explicitly say something like "All compensations completed
or were skipped — mark saga as FAILED because the original forward transaction
failed", keeping the logic (compensation_errors, SagaStatus.FAILED,
_storage.update_status, _saga_id) unchanged.
In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 215-237: The update_status method on the SQLAlchemy saga storage
silently succeeds when no row is updated; modify update_status (in
src/cqrs/saga/storage/sqlalchemy.py) to inspect the result of await
self._session.execute(...) (check result.rowcount) and raise an appropriate
error when rowcount == 0 (same behavior as update_context when current_version
is set). Reference the SagaExecutionModel and the update_status method when
adding the check so the call will surface a missing-saga error instead of
silently doing nothing.
In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`:
- Around line 117-193: The two legacy benchmarks duplicate session_factory,
storage creation, context creation, and the run_transaction body; refactor by
adding a small factory/helper (e.g., _make_storage(engine, storage_cls)) to
build the storage using async_sessionmaker and parametrize the tests over
storage classes (SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy) so you
keep test names explicit via pytest.param ids; update
test_benchmark_saga_sqlalchemy_legacy_full_transaction and
test_benchmark_saga_sqlalchemy_legacy_single_step to accept a storage_cls
parameter, call _make_storage(loop_engine[1], storage_cls) and reuse a single
run_transaction implementation (or inline identical body) instead of duplicating
setup and iteration logic.
In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py`:
- Around line 36-74: Duplicate _TestContainer, its fixtures (container,
saga_mediator), and related helpers should be extracted into a shared test
utility to avoid code duplication; create a common test module (e.g.,
tests/conftest or tests/helpers) that defines the _TestContainer class
(including methods resolve, attach_external_container, and property
external_container) and the container and saga_mediator fixtures, then update
both test_saga_mediator_sqlalchemy_mysql.py and
test_saga_mediator_sqlalchemy_postgres.py to import and use those shared symbols
instead of defining them locally.
In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py`:
- Around line 242-256: The test
test_get_sagas_for_recovery_ordered_by_updated_at uses asyncio.sleep(1.0) to
force an updated_at ordering which is flaky; instead, modify the test to avoid
wall-clock sleeps by directly adjusting the updated_at timestamp for the target
saga (like create_interrupted_saga does) after calling
storage.create_saga/storage.update_status and before calling
storage.get_sagas_for_recovery — e.g., use the storage's DB session/engine or an
existing helper to set the updated_at of id2 to a later timestamp so the
ordering is deterministic when asserting ids[-1] == id2.
There was a problem hiding this comment.
🧹 Nitpick comments (5)
src/cqrs/outbox/mock.py (1)
33-35: Redundant innerlist()on theelsebranch.
self.session.values()is directly iterable; wrapping it inlist(...)before passing it to the outerlist(...)allocates a throwaway intermediate list. Theelsebranch can be simplified to justself.session.values().♻️ Proposed simplification
return list( - filter(lambda e: topic == e.topic, self.session.values()) if topic else list(self.session.values()), + filter(lambda e: topic == e.topic, self.session.values()) if topic else self.session.values(), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/mock.py` around lines 33 - 35, The return in the mock outbox method currently wraps self.session.values() in an unnecessary inner list() when topic is falsy; update the expression in the return (the lambda/filter branch in the function that returns events from self.session) to pass self.session.values() directly to the outer list() instead of list(self.session.values()), removing the redundant allocation while keeping the topic filtering logic (i.e., keep the filter(lambda e: topic == e.topic, self.session.values()) when topic is truthy).tests/unit/test_deserializers.py (1)
82-95: Test name doesn't match what is actually being tested.
test_json_deserializer_missing_required_fields_negativeimplies that a required field is absent, but the JSON at line 89 supplies all required fields (event_name,payload.foo,payload.bar). It exercises a wrong-type input ("not_an_int"for anintfield), not a missing-field scenario.test_json_deserializer_invalid_structure_negative(line 67) already covers the missing-field case (event_nameomitted), so the naming here is both inaccurate and potentially duplicates intent.✏️ Suggested rename + clarifying comment
-def test_json_deserializer_missing_required_fields_negative(): +def test_json_deserializer_wrong_field_type_negative(): deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) - # JSON with payload that has wrong type for required field 'bar' (string instead of int) - # This should cause a validation error when Pydantic tries to validate the payload + # 'bar' is declared as int; passing a non-coercible string should trigger a Pydantic validation error incomplete_json = '{"event_name": "test", "payload": {"foo": "bar", "bar": "not_an_int"}}'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/test_deserializers.py` around lines 82 - 95, The test named test_json_deserializer_missing_required_fields_negative is misnamed because it feeds a wrong-type value (incomplete_json with "bar": "not_an_int") rather than omitting a required field; rename the test to something like test_json_deserializer_invalid_field_type_negative (or test_json_deserializer_wrong_type_negative) and update its inline comment to state it verifies type-validation failure for payload.bar (using json.JsonDeserializer and the incomplete_json payload) to avoid duplicating the missing-field test (test_json_deserializer_invalid_structure_negative).src/cqrs/requests/mermaid.py (1)
210-235: Consider extracting the repeated field-type extraction into a helper.The dataclass / Pydantic-v2 / Pydantic-v1 branching block at lines 211-233 is duplicated verbatim at lines 242-264 (the only difference is the surrounding variable). All four lines touched by this PR live inside these twin blocks, meaning any future fix will need to be applied twice. A small helper avoids the drift:
♻️ Proposed refactor
+ `@staticmethod` + def _extract_fields(cls: type) -> list[tuple[str, str]]: + """Return [(field_name, field_type_str)] for dataclass, Pydantic v2, or Pydantic v1 models.""" + if hasattr(cls, "__dataclass_fields__"): + return [ + (n, fi.type.__name__ if hasattr(fi.type, "__name__") else str(fi.type)) + for n, fi in cls.__dataclass_fields__.items() + ] + if hasattr(cls, "model_fields"): # Pydantic v2 + return [ + (n, fi.annotation.__name__ if hasattr(fi.annotation, "__name__") else str(fi.annotation)) + for n, fi in cls.model_fields.items() + ] + if hasattr(cls, "__fields__"): # Pydantic v1 + return [ + (n, fi.type_.__name__ if hasattr(fi.type_, "__name__") else str(fi.type_)) + for n, fi in cls.__fields__.items() + ] + return []Then both the request-types and response-types loops become:
- if hasattr(request_type, "__dataclass_fields__"): - ... - elif hasattr(request_type, "model_fields"): - ... - elif hasattr(request_type, "__fields__"): - ... + for field_name, field_type in self._extract_fields(request_type): + lines.append(f" +{field_name}: {field_type}")(Apply the same substitution for the
response_typeblock.)Also applies to: 241-265
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/requests/mermaid.py` around lines 210 - 235, The block that extracts field names and types for dataclasses, Pydantic v2 (model_fields) and Pydantic v1 (__fields__) is duplicated for request_type and response_type; create a small helper function (e.g., _extract_field_entries or format_type_fields) that accepts a type object (like request_type/response_type) and returns the list of formatted " +{name}: {type}" lines, handling the three branches (.__dataclass_fields__, .model_fields, .__fields__). Replace both duplicated loops with calls to this helper and append the returned lines before closing the "}" so that field extraction logic lives in one place and is reused for request_type and response_type.src/cqrs/saga/storage/memory.py (1)
145-157: No-oprollback()means mutations are not undone on error.Since all mutation methods (
create_saga,update_context,update_status,log_step) delegate immediately to the underlyingMemorySagaStorageand modify its in-memory dicts in place, callingrollback()after a partial failure leaves the storage in a dirty state. This differs from the SQLAlchemy implementation whererollback()would actually revert uncommitted changes.For production code this is fine (you'd use SQLAlchemy), but for tests that use
MemorySagaStoragewithcreate_run()and expect rollback to undo partial writes, this can cause subtle test pollution. Consider documenting this caveat on the class orrollback()docstring so future test authors are aware.Suggested docstring improvement
async def rollback(self) -> None: """ - Perform no action for rollback; provided to satisfy the SagaStorageRun interface. + No-op: provided to satisfy the SagaStorageRun interface. + + Note: + Because mutations are applied immediately to the underlying in-memory + storage, rollback does NOT undo previously written state. Callers that + rely on true transactional rollback should use a persistent storage backend. """ pass🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/storage/memory.py` around lines 145 - 157, Update the docstring on MemorySagaStorage.run().rollback() (and optionally commit()) to clearly state that these are no-ops and do not revert in-memory mutations: explain that methods like MemorySagaStorage.create_saga, update_context, update_status, and log_step apply changes immediately to the in-memory dicts, so calling create_run().rollback() will not undo partial writes and test authors should reset or recreate the storage between tests; reference MemorySagaStorage and create_run() so readers can find the relevant behavior.examples/saga_recovery.py (1)
617-618: Implicit f-string concatenation can be simplified.Multiple locations in this file (lines 617, 688, 799, 867) use adjacent
f"..." f"..."literals. These are valid Python (implicit string concatenation) but could be a single f-string for readability.Example simplification
- f" [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}", + f" [{entry.timestamp.strftime('%H:%M:%S')}] {entry.step_name}.{entry.action}: {entry.status.value}",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/saga_recovery.py` around lines 617 - 618, Several places use adjacent f-strings (implicit concatenation) e.g. the print/format call that builds " [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}" — replace each pair of adjacent f-strings with a single f-string combining both parts (e.g. f" [{entry.timestamp.strftime('%H:%M:%S')}] {entry.step_name}.{entry.action}: {entry.status.value}") to improve readability; apply the same change for the other occurrences that reference entry.timestamp, entry.step_name, entry.action, and entry.status.value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@examples/saga_recovery.py`:
- Around line 617-618: Several places use adjacent f-strings (implicit
concatenation) e.g. the print/format call that builds "
[{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}:
{entry.status.value}" — replace each pair of adjacent f-strings with a single
f-string combining both parts (e.g. f"
[{entry.timestamp.strftime('%H:%M:%S')}] {entry.step_name}.{entry.action}:
{entry.status.value}") to improve readability; apply the same change for the
other occurrences that reference entry.timestamp, entry.step_name, entry.action,
and entry.status.value.
In `@src/cqrs/outbox/mock.py`:
- Around line 33-35: The return in the mock outbox method currently wraps
self.session.values() in an unnecessary inner list() when topic is falsy; update
the expression in the return (the lambda/filter branch in the function that
returns events from self.session) to pass self.session.values() directly to the
outer list() instead of list(self.session.values()), removing the redundant
allocation while keeping the topic filtering logic (i.e., keep the filter(lambda
e: topic == e.topic, self.session.values()) when topic is truthy).
In `@src/cqrs/requests/mermaid.py`:
- Around line 210-235: The block that extracts field names and types for
dataclasses, Pydantic v2 (model_fields) and Pydantic v1 (__fields__) is
duplicated for request_type and response_type; create a small helper function
(e.g., _extract_field_entries or format_type_fields) that accepts a type object
(like request_type/response_type) and returns the list of formatted "
+{name}: {type}" lines, handling the three branches (.__dataclass_fields__,
.model_fields, .__fields__). Replace both duplicated loops with calls to this
helper and append the returned lines before closing the "}" so that field
extraction logic lives in one place and is reused for request_type and
response_type.
In `@src/cqrs/saga/storage/memory.py`:
- Around line 145-157: Update the docstring on
MemorySagaStorage.run().rollback() (and optionally commit()) to clearly state
that these are no-ops and do not revert in-memory mutations: explain that
methods like MemorySagaStorage.create_saga, update_context, update_status, and
log_step apply changes immediately to the in-memory dicts, so calling
create_run().rollback() will not undo partial writes and test authors should
reset or recreate the storage between tests; reference MemorySagaStorage and
create_run() so readers can find the relevant behavior.
In `@tests/unit/test_deserializers.py`:
- Around line 82-95: The test named
test_json_deserializer_missing_required_fields_negative is misnamed because it
feeds a wrong-type value (incomplete_json with "bar": "not_an_int") rather than
omitting a required field; rename the test to something like
test_json_deserializer_invalid_field_type_negative (or
test_json_deserializer_wrong_type_negative) and update its inline comment to
state it verifies type-validation failure for payload.bar (using
json.JsonDeserializer and the incomplete_json payload) to avoid duplicating the
missing-field test (test_json_deserializer_invalid_structure_negative).
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (12)
examples/saga_recovery_scheduler.py (1)
518-530: Inconsistent logging format withinrun_recovery_iterationLines 518 and 520 use f-strings while lines 524, 527, and 530 (all in the same exception-handling block) retain
%slazy formatting. The change introduced this split. Beyond the style inconsistency, f-string arguments are always evaluated eagerly — before the logging framework checks the effective log level — whereas%splaceholders let the framework skip formatting when the level is suppressed. In example code this rarely matters, but unifying on%swould both remove the inconsistency and match the rest of the calls in this file.♻️ Suggested unification to `%s` formatting
- logger.info(f"Recovering saga {saga_id}...") + logger.info("Recovering saga %s...", saga_id) await recover_saga(saga, saga_id, context_builder, container, storage) - logger.info(f"Saga {saga_id} recovered successfully.") + logger.info("Saga %s recovered successfully.", saga_id)Apply the same pattern to lines 554, 562, and 572:
- logger.info(f"Recovery iteration {iteration}") + logger.info("Recovery iteration %d", iteration) ... - logger.info(f"Processed {processed} saga(s) this iteration.") + logger.info("Processed %d saga(s) this iteration.", processed) ... - logger.info(f"Reached max_iterations={max_iterations}, stopping.") + logger.info("Reached max_iterations=%d, stopping.", max_iterations)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/saga_recovery_scheduler.py` around lines 518 - 530, The logging calls in run_recovery_iteration are inconsistent: replace the eager f-strings used in logger.info(f"Recovering saga {saga_id}...") and logger.info(f"Saga {saga_id} recovered successfully.") with lazy %-style formatting (e.g., logger.info("Recovering saga %s...", saga_id) and logger.info("Saga %s recovered successfully.", saga_id)) so they match the existing logger.exception/printf-style calls and avoid eager evaluation; also apply the same %-style unification to the other similar log lines referenced (the calls around the later recovery blocks, currently at the locations corresponding to lines noted in the review) to keep formatting consistent across recover_saga, run_recovery_iteration and related recovery logging.tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py (2)
23-31:_make_storagehelper duplicates inlinesession_factorycreation in run-path tests.The run-path benchmarks (Lines 67–73) create
async_sessionmakerinline with the same config as_make_storage(Lines 25–30). Consider using_make_storagein the run-path tests too, reducing duplication.Also applies to: 57-85
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 23 - 31, The run-path benchmarks duplicate creation of the async_sessionmaker with the same config as the helper _make_storage; update the run-path benchmark code to call _make_storage(engine, storage_cls) instead of inlining async_sessionmaker so the shared session_factory configuration (expire_on_commit=False, autocommit=False, autoflush=False) is centralized; ensure you pass the same storage class used inline into _make_storage and remove the duplicated session_factory construction to avoid drift.
131-158: Parameterized "legacy" benchmarks also run the scoped-run storage, causing redundancy and confusing naming.
test_benchmark_saga_sqlalchemy_legacy_full_transactionis parametrized with bothSqlAlchemySagaStorage(run-path) andSqlAlchemySagaStorageLegacy, but the function name says "legacy". This creates two issues:
- Redundancy:
SqlAlchemySagaStorageis already benchmarked in the dedicatedtest_benchmark_saga_sqlalchemy_run_full_transaction.- Misleading test IDs: The generated test ID
…_legacy_full_transaction[storage]suggests legacy behavior when it actually exercises the run path.Consider either:
- Removing
SqlAlchemySagaStoragefrom the parametrization (keep onlySqlAlchemySagaStorageLegacy), or- Renaming to a neutral name like
*_compare_*if the intent is to compare both paths.Same applies to
test_benchmark_saga_sqlalchemy_legacy_single_step.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py` around lines 131 - 158, The test test_benchmark_saga_sqlalchemy_legacy_full_transaction is parametrized with both SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy which is misleading given the function name; either restrict the parametrization to only SqlAlchemySagaStorageLegacy (remove SqlAlchemySagaStorage from the list and its "storage" id) so the test truly targets the legacy path, or rename the test to a neutral name (e.g., test_benchmark_saga_sqlalchemy_compare_full_transaction) and update the ids to reflect both variants; apply the same change pattern to test_benchmark_saga_sqlalchemy_legacy_single_step to avoid redundant benchmarking and confusing IDs.tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py (1)
120-201: Session factory creation is repeated inline across legacy tests.Lines 134–139 and 178–183 duplicate the same
async_sessionmaker(engine, expire_on_commit=False, autocommit=False, autoflush=False)pattern. Thedefaultvariant uses a_make_storagehelper for this. Consider extracting a similar helper here for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py` around lines 120 - 201, Extract the repeated async_sessionmaker+SqlAlchemySagaStorageLegacy setup into a helper (e.g. _make_legacy_storage) and call it from both test_benchmark_saga_sqlalchemy_legacy_full_transaction and test_benchmark_saga_sqlalchemy_legacy_single_step; specifically, move the async_sessionmaker(engine, expire_on_commit=False, autocommit=False, autoflush=False) + SqlAlchemySagaStorageLegacy(session_factory) creation into a single function that accepts the engine (and optionally config flags) and returns the configured storage, then replace the inline session_factory/storage logic in the two tests with calls to that helper to eliminate duplication and mirror the existing _make_storage pattern.tests/integration/test_saga_mediator_sqlalchemy_postgres.py (1)
63-93:test_saga_mediator_emits_eventsduplicatestest_saga_mediator_processes_events_from_steps.Both tests create a context, stream the saga, then assert
len(handler.handled_events) >= 1on the same three handlers. The only difference is the first test also collectsstep_results. Consider merging them or differentiating the assertions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py` around lines 63 - 93, The two tests test_saga_mediator_processes_events_from_steps and test_saga_mediator_emits_events are redundant: both stream a saga context and assert the same handlers (InventoryReservedEventHandler, PaymentProcessedEventHandler, OrderShippedEventHandler) handled events; the first also collects step_results. Consolidate them by keeping test_saga_mediator_processes_events_from_steps and incorporate the emission assertions from test_saga_mediator_emits_events (or vice versa): ensure you stream a unique OrderContext (e.g., order_id "789" if keeping the second) and either assert step_results length == 3 and that inventory_handler.handled_events, payment_handler.handled_events, and shipping_handler.handled_events each have length >= 1, then remove the duplicate test function to avoid duplicated coverage.tests/integration/conftest.py (1)
73-83: Fixture directly accesses private attributes of_TestContainerfor cleanup.Lines 77–82 reach into
container._step_handlersandcontainer._event_handlers. Consider adding areset()method on_TestContainerto encapsulate this cleanup logic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/conftest.py` around lines 73 - 83, The test fixture is directly accessing private attributes container._step_handlers and container._event_handlers to clear internal state; add a public reset() method on the _TestContainer class that encapsulates this cleanup (iterate its internal step handlers and event handlers and clear step_handler._events and event_handler.handled_events as needed) and then replace the manual loops in the container fixture with a single call to container.reset(); ensure reset() is named exactly reset so the fixture can call container.reset() without touching private members.tests/integration/test_saga_storage_sqlalchemy_postgres.py (2)
242-260: Deterministic ordering test — nice improvement over the MySQLasyncio.sleepapproach.Using direct
updated_atmanipulation via the session factory (line 252–257) is faster and more deterministic than the MySQL test'sasyncio.sleep(1.0). However, this relies on thesession_factoryattribute being part ofSqlAlchemySagaStorage's public contract.Consider extracting a helper or factoring this into a shared utility if the pattern recurs, to avoid coupling tests to
storage.session_factorydirectly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py` around lines 242 - 260, The test test_get_sagas_for_recovery_ordered_by_updated_at directly manipulates SqlAlchemySagaStorage.session_factory and SagaExecutionModel.updated_at which couples the test to the storage internals; refactor by extracting a shared test helper (e.g., a utility function used by tests that accepts storage and saga_id and updates the underlying updated_at) or add a public helper method on SqlAlchemySagaStorage (e.g., update_saga_timestamp) to perform the session/execute/commit logic, then update this test to call that helper instead of using storage.session_factory directly so tests no longer depend on storage.session_factory internals while still setting updated_at for get_sagas_for_recovery ordering checks.
188-197: Cleanup fixture only runs before yield — leftover data persists after the last test.
_clean_saga_tablesdeletes data before yielding but does not clean up after the test. While this is a common "setup-only" pattern and works fine when every test method in the class triggers this fixture, data from the last test will remain in the database. If other test classes or modules share the same database and don't clean up, this could cause interference.If cross-module isolation matters, consider adding post-yield cleanup:
yield async with saga_session_factory_postgres() as session: await session.execute(delete(SagaLogModel)) await session.execute(delete(SagaExecutionModel)) await session.commit()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py` around lines 188 - 197, The fixture _clean_saga_tables currently only deletes SagaLogModel and SagaExecutionModel before yielding, so leftover rows from the last test persist; modify the fixture to perform the same deletions again after the yield (using saga_session_factory_postgres to open a session and execute delete(SagaLogModel) and delete(SagaExecutionModel) followed by commit) so cleanup runs both before and after each test.tests/integration/test_saga_storage_sqlalchemy_mysql.py (2)
1-393: MySQL test suite mirrors the Postgres suite — significant duplication.This file is nearly identical to
test_saga_storage_sqlalchemy_postgres.py, with the only differences being fixture names (saga_session_factory_mysqlvssaga_session_factory_postgres), class names, and the ordering test approach (asyncio.sleepvs direct timestamp manipulation).Consider extracting the shared test logic into a base class or parameterized test module, and having the MySQL/Postgres files only supply fixtures. This would reduce ~350 lines of duplication and ensure both backends are always tested with the same assertions.
# Example: tests/integration/_saga_storage_tests.py (shared base) class SagaStorageTestBase: async def test_full_saga_lifecycle(self, storage, saga_id, test_context): ... # tests/integration/test_saga_storage_sqlalchemy_mysql.py class TestIntegrationMysql(SagaStorageTestBase): ... # only MySQL-specific fixtures and overrides🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py` around lines 1 - 393, The test file duplicates the Postgres suite; extract shared tests into a single reusable base (e.g., class SagaStorageTestBase in tests/integration/_saga_storage_tests.py) containing all test methods like test_full_saga_lifecycle, test_compensation_scenario, test_persistence_across_sessions, test_concurrent_updates, test_optimistic_locking, and the recovery tests, then update TestIntegrationMysql to inherit from SagaStorageTestBase and only provide MySQL-specific fixtures (saga_session_factory_mysql, storage fixture construction using SqlAlchemySagaStorage) and any small differences (e.g., ordering behavior) via overrides; do the same for the Postgres test file to supply saga_session_factory_postgres so both backends reuse the same assertions without duplication.
242-255: Ordering test relies onasyncio.sleep(1.0)— potentially flaky and slow.The 1-second sleep introduces real-time dependency. If the database's
updated_attimestamp resolution is coarse or if the test environment is under load, the ordering might not be deterministic. The Postgres counterpart uses directupdated_atmanipulation, which is faster and more reliable.Consider using the same direct-timestamp approach as the Postgres test to make this test deterministic and faster:
- await asyncio.sleep(1.0) - await storage.update_context(id2, {**test_context, "touched": True}) + from datetime import datetime, timedelta, timezone + from sqlalchemy import update + later = datetime.now(timezone.utc) + timedelta(seconds=10) + async with storage.session_factory() as session: + await session.execute( + update(SagaExecutionModel).where(SagaExecutionModel.id == id2).values(updated_at=later), + ) + await session.commit()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py` around lines 242 - 255, The test test_get_sagas_for_recovery_ordered_by_updated_at is flaky because it uses asyncio.sleep(1.0) to force an updated_at change; instead, set the updated_at timestamp deterministically for one saga so ordering is reliable and fast. Replace the sleep+update_context approach by directly updating the saga row's updated_at for id2 (using the storage's DB session/engine or a helper on SqlAlchemySagaStorage) to a timestamp greater than the others, then call get_sagas_for_recovery and assert id2 is last; use existing methods like storage.create_saga, storage.update_status and the underlying SQLAlchemy session/updated_at column to perform the direct timestamp update.src/cqrs/saga/compensation.py (1)
73-77:SagaStepStatus.COMPLETEDused for compensate-action log entries, notSagaStepStatus.COMPENSATED.The compensator logs successful compensation steps with
SagaStepStatus.COMPLETED(line 106) and checks for that status here (line 76). TheCOMPENSATEDenum value is not used by the compensator. This is internally consistent but may confuse contributors who expect theCOMPENSATEDstatus to be used for compensated steps.Consider using
SagaStepStatus.COMPENSATEDfor compensation step logs to better convey the semantic meaning, or add a brief comment explaining the convention.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/compensation.py` around lines 73 - 77, The code records compensated steps by filtering history entries with SagaStepStatus.COMPLETED but the compensator itself logs successful compensation using that same COMPLETED value, which is confusing; update the compensator to log successful compensation entries with SagaStepStatus.COMPENSATED (or, alternatively, change this filter to look for COMPENSATED) so the semantics match: modify the compensator's logging call where it records a successful "compensate" action to use SagaStepStatus.COMPENSATED, and update the set comprehension that builds compensated_steps (the one using self._storage.get_step_history and SagaStepStatus.COMPLETED) to check for SagaStepStatus.COMPENSATED instead; if you keep the existing behavior, add a brief inline comment near the compensated_steps computation explaining the convention.src/cqrs/saga/saga.py (1)
260-327: Multiple sequential commits for saga creation — acceptable but worth noting.For new sagas with a run (lines 261–269):
create_saga → commit → update_status(RUNNING) → commit. If the process crashes between the two commits, the saga persists as PENDING but never transitions to RUNNING. Similarly in theexcept ValueErrorrecovery path (lines 317–327).This is acceptable given that recovery can detect PENDING sagas, but ensure the recovery logic handles this edge case (PENDING sagas with no steps).
You could batch
create_saga+update_statusinto a single commit to reduce the crash-window:await state_manager.create_saga( self._saga.__class__.__name__, self._context, ) - if run is not None: - await run.commit() await state_manager.update_status(SagaStatus.RUNNING) if run is not None: await run.commit()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/saga/saga.py` around lines 260 - 327, The code calls create_saga(...), commits, then update_status(RUNNING) and commits again (both in the initial-new-saga path and the except ValueError recovery path), which opens a crash window where a saga can remain PENDING; change the flow so that create_saga and update_status are done inside the same transaction/commit when a run is present (i.e., call state_manager.create_saga(...) then state_manager.update_status(SagaStatus.RUNNING) and call run.commit() only once after both), and for the no-run path either extend create_saga to accept an initial status or ensure update_status is applied atomically (combine into a single storage operation); update the blocks around state_manager.create_saga, state_manager.update_status, and run.commit to perform a single commit per saga creation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/saga/saga.py`:
- Around line 165-180: The inner NotImplementedError handler should be removed
because it hides user code errors and can cause duplicate-saga creation: when
create_run() returns a context manager (run_cm) you should await it and
propagate any exceptions from async with run_cm as run: and from async for
step_result in self._execute(run): instead of catching NotImplementedError and
falling back to self._execute(None); update the block that checks run_cm (the
code using run_cm, run and calling _execute) to omit the inner try/except so any
NotImplementedError from _execute or its steps bubbles up, leaving only the
outer try/except that wraps self._storage.create_run().
In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py`:
- Around line 14-28: The test file imports _TestContainer from
test_saga_mediator_memory which shadows the conftest-local _TestContainer used
by the container fixture; remove _TestContainer from the import list in this
test and either (a) import the correct _TestContainer defined in the test
suite's conftest module or (b) drop/replace the parameter type annotation (e.g.,
use typing.Any) on tests that accept container so the pytest-provided fixture's
type is not misreferenced; update references to _TestContainer in this file
(test parameter annotations) accordingly.
---
Nitpick comments:
In `@examples/saga_recovery_scheduler.py`:
- Around line 518-530: The logging calls in run_recovery_iteration are
inconsistent: replace the eager f-strings used in logger.info(f"Recovering saga
{saga_id}...") and logger.info(f"Saga {saga_id} recovered successfully.") with
lazy %-style formatting (e.g., logger.info("Recovering saga %s...", saga_id) and
logger.info("Saga %s recovered successfully.", saga_id)) so they match the
existing logger.exception/printf-style calls and avoid eager evaluation; also
apply the same %-style unification to the other similar log lines referenced
(the calls around the later recovery blocks, currently at the locations
corresponding to lines noted in the review) to keep formatting consistent across
recover_saga, run_recovery_iteration and related recovery logging.
In `@src/cqrs/saga/compensation.py`:
- Around line 73-77: The code records compensated steps by filtering history
entries with SagaStepStatus.COMPLETED but the compensator itself logs successful
compensation using that same COMPLETED value, which is confusing; update the
compensator to log successful compensation entries with
SagaStepStatus.COMPENSATED (or, alternatively, change this filter to look for
COMPENSATED) so the semantics match: modify the compensator's logging call where
it records a successful "compensate" action to use SagaStepStatus.COMPENSATED,
and update the set comprehension that builds compensated_steps (the one using
self._storage.get_step_history and SagaStepStatus.COMPLETED) to check for
SagaStepStatus.COMPENSATED instead; if you keep the existing behavior, add a
brief inline comment near the compensated_steps computation explaining the
convention.
In `@src/cqrs/saga/saga.py`:
- Around line 260-327: The code calls create_saga(...), commits, then
update_status(RUNNING) and commits again (both in the initial-new-saga path and
the except ValueError recovery path), which opens a crash window where a saga
can remain PENDING; change the flow so that create_saga and update_status are
done inside the same transaction/commit when a run is present (i.e., call
state_manager.create_saga(...) then
state_manager.update_status(SagaStatus.RUNNING) and call run.commit() only once
after both), and for the no-run path either extend create_saga to accept an
initial status or ensure update_status is applied atomically (combine into a
single storage operation); update the blocks around state_manager.create_saga,
state_manager.update_status, and run.commit to perform a single commit per saga
creation.
In `@tests/benchmarks/dataclasses/test_benchmark_saga_sqlalchemy.py`:
- Around line 120-201: Extract the repeated
async_sessionmaker+SqlAlchemySagaStorageLegacy setup into a helper (e.g.
_make_legacy_storage) and call it from both
test_benchmark_saga_sqlalchemy_legacy_full_transaction and
test_benchmark_saga_sqlalchemy_legacy_single_step; specifically, move the
async_sessionmaker(engine, expire_on_commit=False, autocommit=False,
autoflush=False) + SqlAlchemySagaStorageLegacy(session_factory) creation into a
single function that accepts the engine (and optionally config flags) and
returns the configured storage, then replace the inline session_factory/storage
logic in the two tests with calls to that helper to eliminate duplication and
mirror the existing _make_storage pattern.
In `@tests/benchmarks/default/test_benchmark_saga_sqlalchemy.py`:
- Around line 23-31: The run-path benchmarks duplicate creation of the
async_sessionmaker with the same config as the helper _make_storage; update the
run-path benchmark code to call _make_storage(engine, storage_cls) instead of
inlining async_sessionmaker so the shared session_factory configuration
(expire_on_commit=False, autocommit=False, autoflush=False) is centralized;
ensure you pass the same storage class used inline into _make_storage and remove
the duplicated session_factory construction to avoid drift.
- Around line 131-158: The test
test_benchmark_saga_sqlalchemy_legacy_full_transaction is parametrized with both
SqlAlchemySagaStorage and SqlAlchemySagaStorageLegacy which is misleading given
the function name; either restrict the parametrization to only
SqlAlchemySagaStorageLegacy (remove SqlAlchemySagaStorage from the list and its
"storage" id) so the test truly targets the legacy path, or rename the test to a
neutral name (e.g., test_benchmark_saga_sqlalchemy_compare_full_transaction) and
update the ids to reflect both variants; apply the same change pattern to
test_benchmark_saga_sqlalchemy_legacy_single_step to avoid redundant
benchmarking and confusing IDs.
In `@tests/integration/conftest.py`:
- Around line 73-83: The test fixture is directly accessing private attributes
container._step_handlers and container._event_handlers to clear internal state;
add a public reset() method on the _TestContainer class that encapsulates this
cleanup (iterate its internal step handlers and event handlers and clear
step_handler._events and event_handler.handled_events as needed) and then
replace the manual loops in the container fixture with a single call to
container.reset(); ensure reset() is named exactly reset so the fixture can call
container.reset() without touching private members.
In `@tests/integration/test_saga_mediator_sqlalchemy_postgres.py`:
- Around line 63-93: The two tests
test_saga_mediator_processes_events_from_steps and
test_saga_mediator_emits_events are redundant: both stream a saga context and
assert the same handlers (InventoryReservedEventHandler,
PaymentProcessedEventHandler, OrderShippedEventHandler) handled events; the
first also collects step_results. Consolidate them by keeping
test_saga_mediator_processes_events_from_steps and incorporate the emission
assertions from test_saga_mediator_emits_events (or vice versa): ensure you
stream a unique OrderContext (e.g., order_id "789" if keeping the second) and
either assert step_results length == 3 and that
inventory_handler.handled_events, payment_handler.handled_events, and
shipping_handler.handled_events each have length >= 1, then remove the duplicate
test function to avoid duplicated coverage.
In `@tests/integration/test_saga_storage_sqlalchemy_mysql.py`:
- Around line 1-393: The test file duplicates the Postgres suite; extract shared
tests into a single reusable base (e.g., class SagaStorageTestBase in
tests/integration/_saga_storage_tests.py) containing all test methods like
test_full_saga_lifecycle, test_compensation_scenario,
test_persistence_across_sessions, test_concurrent_updates,
test_optimistic_locking, and the recovery tests, then update
TestIntegrationMysql to inherit from SagaStorageTestBase and only provide
MySQL-specific fixtures (saga_session_factory_mysql, storage fixture
construction using SqlAlchemySagaStorage) and any small differences (e.g.,
ordering behavior) via overrides; do the same for the Postgres test file to
supply saga_session_factory_postgres so both backends reuse the same assertions
without duplication.
- Around line 242-255: The test
test_get_sagas_for_recovery_ordered_by_updated_at is flaky because it uses
asyncio.sleep(1.0) to force an updated_at change; instead, set the updated_at
timestamp deterministically for one saga so ordering is reliable and fast.
Replace the sleep+update_context approach by directly updating the saga row's
updated_at for id2 (using the storage's DB session/engine or a helper on
SqlAlchemySagaStorage) to a timestamp greater than the others, then call
get_sagas_for_recovery and assert id2 is last; use existing methods like
storage.create_saga, storage.update_status and the underlying SQLAlchemy
session/updated_at column to perform the direct timestamp update.
In `@tests/integration/test_saga_storage_sqlalchemy_postgres.py`:
- Around line 242-260: The test
test_get_sagas_for_recovery_ordered_by_updated_at directly manipulates
SqlAlchemySagaStorage.session_factory and SagaExecutionModel.updated_at which
couples the test to the storage internals; refactor by extracting a shared test
helper (e.g., a utility function used by tests that accepts storage and saga_id
and updates the underlying updated_at) or add a public helper method on
SqlAlchemySagaStorage (e.g., update_saga_timestamp) to perform the
session/execute/commit logic, then update this test to call that helper instead
of using storage.session_factory directly so tests no longer depend on
storage.session_factory internals while still setting updated_at for
get_sagas_for_recovery ordering checks.
- Around line 188-197: The fixture _clean_saga_tables currently only deletes
SagaLogModel and SagaExecutionModel before yielding, so leftover rows from the
last test persist; modify the fixture to perform the same deletions again after
the yield (using saga_session_factory_postgres to open a session and execute
delete(SagaLogModel) and delete(SagaExecutionModel) followed by commit) so
cleanup runs both before and after each test.
Closes issue #60
Summary
The reduce-saga-storage-overhead branch reduces saga storage load by introducing checkpoint commits: one session per saga run with explicit commits only at key points (after each step, after each compensation step, etc.) instead of committing after every storage call. This cuts the number of commits, shortens lock hold time, and lowers the risk of deadlocks when using SQLAlchemy.
Main changes
1. Checkpoint commits and
SagaStorageRunprotocolSagaStorageRun(src/cqrs/saga/storage/protocol.py): represents a scoped “session” for a single saga. Its methods only perform operations within that session and do not commit; the caller is responsible for callingcommit()at the desired checkpoints.ISagaStorage.create_run(): returns a context manager that yields aSagaStorageRun. If a storage does not supportcreate_run()(e.g. a custom implementation), it may raiseNotImplementedError, and execution falls back to the previous behaviour (no single session, no checkpoint commits).SagaTransactionbehaviour: whencreate_run()is available, the saga runs inside one session and commits only at checkpoints: after creating the saga and setting status to RUNNING, after each completed step (act), after each compensated step (compensate), and at the end of compensation or when marking COMPLETED/failed. Storages that do not implementcreate_run()continue to work as before.2. Storage implementations
MemorySagaStorage): implementscreate_run(), returning_MemorySagaStorageRun;commit/rollbackare no-ops, but the protocol is aligned with SQLAlchemy.SqlAlchemySagaStorage): implementscreate_run(), returning_SqlAlchemySagaStorageRunbacked by a singleAsyncSessionper run; all mutations go through that session and are committed only when the saga callsrun.commit().3. Deadlock mitigation (Fix deadlocks)
load_saga_state(..., read_for_update=True): when loading state for recovery or exclusive update, the implementation can lock the row (e.g.SELECT ... FOR UPDATEin SQLAlchemy) so the same saga is not updated concurrently. Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.4. Compensation
SagaCompensatornow accepts an optionalon_after_compensate_stepcallback, invoked after each successfully compensated step. When using a run, the saga passesrun.commitso each compensation step is persisted at a checkpoint.5. Documentation and types
SagaStorageRun, memory and SQLAlchemy storage,SagaTransaction, execution and recovery managers, and compensator. The “Strict Backward Recovery” behaviour and use of checkpoint commits whencreate_run()is available are described in the code.Tests and infrastructure
Unit tests
tests/unit/test_saga/test_saga_storage_run.py: tests the path withcreate_run()and checkpoint commits (memory storage), and the fallback when storage does not implementcreate_run().Integration tests and databases
test_saga_storage_sqlalchemy_postgres.py/test_saga_storage_sqlalchemy_mysql.py, andtest_saga_mediator_sqlalchemy_postgres.py/test_saga_mediator_sqlalchemy_mysql.py; shared conftest/fixtures were extended for PostgreSQL (DATABASE_DSN_POSTGRESQL).CI
DATABASE_DSN_POSTGRESQL; run unit and integration tests with both MySQL and PostgreSQL.codspeed.yml: run benchmarks via CodSpeed (MySQL, PostgreSQL, Redis;pytest -c ./tests/pytest-config.ini tests/benchmarks/ --codspeed).DATABASE_DSN_POSTGRESQLto the test env.Docker and dependencies
Benchmarks
tests/benchmarks/were updated and extended (including conftest and dataclasses/default variants for memory and SQLAlchemy) to measure the impact of the new storage behaviour.Examples and compatibility
saga.py,saga_recovery.py,saga_sqlalchemy_storage.py,saga_fallback.py,saga_recovery_scheduler.py, etc.) were updated to the current storage API and, where applicable, to use the checkpoint path.ISagaStoragewithoutcreate_run()continues to work; the new path is used only when the storage implementscreate_run()(Memory and SQLAlchemy do so out of the box).Miscellaneous
Summary of benefits
read_for_updateon recovery.create_run()following the Memory/SQLAlchemy pattern.Summary by CodeRabbit
New Features
Documentation
Tests / Benchmarks
Chores