Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughRefactors saga orchestration to persist via repository load-modify-save with revision-based concurrency; replaces in-memory saga mutations with repo.save_saga calls and changes several method signatures; updates tests and test doubles; plus unrelated dependency and CI/frontend image bumps. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Orchestrator
participant Repo
participant StepHandler
participant Producer
Client->>Orchestrator: start/execute saga (saga_id)
Orchestrator->>Repo: get_or_create_saga / get_saga(saga_id)
Repo-->>Orchestrator: persisted saga (with revision)
Orchestrator->>Repo: save_saga(saga_id, current_step=...)
Repo-->>Orchestrator: updated saga
Orchestrator->>StepHandler: execute step
StepHandler-->>Orchestrator: success / failure
alt success
Orchestrator->>Repo: save_saga(saga_id, completed_steps, context)
Repo-->>Orchestrator: updated saga
Orchestrator->>Producer: publish step/completion events (if enabled)
else failure with compensation
Orchestrator->>Repo: save_saga(saga_id, state=COMPENSATING)
Orchestrator->>Producer: publish failure/cancelled (if enabled)
Orchestrator->>StepHandler: invoke compensation steps
StepHandler-->>Orchestrator: compensation results
Orchestrator->>Repo: save_saga(saga_id, compensated_steps / final_state)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
backend/app/db/repositories/saga_repository.py (1)
47-66: Clean load-modify-save with optimistic concurrency — consider guarding against invalid field names.The pattern is sound. One subtle risk:
setattr(doc, field, value)will silently accept typos or non-existent field names, creating regular Python attributes thatsave_changes()will ignore. The update would appear to succeed but the field wouldn't actually be persisted.Consider adding a guard:
🛡️ Optional field-name validation
+_saga_doc_fields = set(SagaDocument.model_fields) + async def save_saga(self, saga_id: str, **updates: Any) -> Saga: ... doc = await SagaDocument.find_one(SagaDocument.saga_id == saga_id) if not doc: raise SagaNotFoundError(saga_id) + unknown = set(updates) - _saga_doc_fields + if unknown: + raise ValueError(f"Unknown SagaDocument fields: {unknown}") for field, value in updates.items(): setattr(doc, field, value)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/db/repositories/saga_repository.py` around lines 47 - 66, The save_saga method currently uses setattr(doc, field, value) which will silently create non-persisted Python attributes for typos or invalid field names; update save_saga to validate the keys in updates against the SagaDocument's declared/persisted fields before applying them (e.g., check against SagaDocument model/field metadata or doc.__class__ field list), and if any invalid field names are found raise a clear error (e.g., SagaFieldValidationError or ValueError) listing the invalid keys; only call setattr and save_changes() when all update keys are valid, keeping the existing SagaNotFoundError, RevisionIdWasChanged -> SagaConcurrencyError handling and the final return self._to_domain(doc).backend/app/services/saga/saga_orchestrator.py (2)
209-249:_compensate_sagareloads the saga aftercancel_sagaalready has the persisted copy — minor redundancy.
cancel_saga(line 301) gets backsavedfromsave_saga, then calls_compensate_saga(saga_id, context)(line 323), which immediately reloads the saga from the repo (line 213). You could pass the already-loaded saga to avoid the extra DB read. That said, reloading ensures the freshest state, so this is a trade-off between correctness and efficiency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/saga/saga_orchestrator.py` around lines 209 - 249, The _compensate_saga function currently always reloads the saga via _repo.get_saga causing an extra DB read when cancel_saga already has the persisted copy; change _compensate_saga to accept an optional saga parameter (e.g., saga: Optional[Saga] = None) and use that passed-in saga if provided, falling back to _repo.get_saga(saga_id) only when saga is None; update cancel_saga to pass the saved saga returned from save_saga into _compensate_saga(saga_id, context, saga=saved) and adjust any type hints and callers accordingly to avoid the redundant read while preserving fallback for freshness.
280-325: TOCTOU incancel_sagabetween state check and state write — acceptable with revision guard, but consider atomic CAS.Lines 287–292 read the saga state, and lines 301–304 write the CANCELLED state. A concurrent modification between these two points would raise
SagaConcurrencyError(thanks to revision checking), which is correctly documented. However, the caller would need to retry — and there's no retry logic here.If cancellation failures due to concurrent modification are expected to be rare, this is fine. If they could be frequent (e.g., during active step execution), consider either:
- Adding a retry loop around the read-check-write in
cancel_saga, or- Documenting that callers should handle
SagaConcurrencyErrorwith a retry.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/saga/saga_orchestrator.py` around lines 280 - 325, The cancel_saga flow has a TOCTOU between reading state via get_saga_status and writing CANCELLED via _repo.save_saga; implement a small retry loop around the read-check-write to handle SagaConcurrencyError: in cancel_saga, loop up to N attempts (e.g., 3) where you re-fetch the saga with get_saga_status, re-check allowed states (SagaState.RUNNING/CREATED), attempt _repo.save_saga and on SagaConcurrencyError sleep/backoff briefly then retry; if retries exhausted re-raise the SagaConcurrencyError. Ensure logs (self.logger) record retry attempts and include execution_id/saga_id, and keep existing compensation and event publishing logic unchanged after a successful save.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/app/db/repositories/saga_repository.py`:
- Around line 47-66: The save_saga method currently uses setattr(doc, field,
value) which will silently create non-persisted Python attributes for typos or
invalid field names; update save_saga to validate the keys in updates against
the SagaDocument's declared/persisted fields before applying them (e.g., check
against SagaDocument model/field metadata or doc.__class__ field list), and if
any invalid field names are found raise a clear error (e.g.,
SagaFieldValidationError or ValueError) listing the invalid keys; only call
setattr and save_changes() when all update keys are valid, keeping the existing
SagaNotFoundError, RevisionIdWasChanged -> SagaConcurrencyError handling and the
final return self._to_domain(doc).
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 209-249: The _compensate_saga function currently always reloads
the saga via _repo.get_saga causing an extra DB read when cancel_saga already
has the persisted copy; change _compensate_saga to accept an optional saga
parameter (e.g., saga: Optional[Saga] = None) and use that passed-in saga if
provided, falling back to _repo.get_saga(saga_id) only when saga is None; update
cancel_saga to pass the saved saga returned from save_saga into
_compensate_saga(saga_id, context, saga=saved) and adjust any type hints and
callers accordingly to avoid the redundant read while preserving fallback for
freshness.
- Around line 280-325: The cancel_saga flow has a TOCTOU between reading state
via get_saga_status and writing CANCELLED via _repo.save_saga; implement a small
retry loop around the read-check-write to handle SagaConcurrencyError: in
cancel_saga, loop up to N attempts (e.g., 3) where you re-fetch the saga with
get_saga_status, re-check allowed states (SagaState.RUNNING/CREATED), attempt
_repo.save_saga and on SagaConcurrencyError sleep/backoff briefly then retry; if
retries exhausted re-raise the SagaConcurrencyError. Ensure logs (self.logger)
record retry attempts and include execution_id/saga_id, and keep existing
compensation and event publishing logic unchanged after a successful save.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
backend/app/services/saga/saga_orchestrator.py (2)
176-182: Preferdataclasses.fields()over__dataclass_fields__for field introspection.
__dataclass_fields__is an implementation detail.dataclasses.fields(SagaContextData)is the public API for the same purpose and more resilient to future Python changes.Proposed fix
await self._repo.save_saga( instance.saga_id, completed_steps=[*saved.completed_steps, step.name], context_data=SagaContextData(**{ - k: v for k, v in context.data.items() if k in SagaContextData.__dataclass_fields__ + k: v for k, v in context.data.items() + if k in {f.name for f in dataclasses.fields(SagaContextData)} }), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/saga/saga_orchestrator.py` around lines 176 - 182, Replace the use of the implementation-detail attribute SagaContextData.__dataclass_fields__ with the public dataclasses API: import dataclasses.fields and compute the set of valid field names via {f.name for f in fields(SagaContextData)}, then use that set in the dict comprehension inside the await self._repo.save_saga(...) call (the block that currently builds context_data from context.data). This keeps the logic in the method/class unchanged but uses dataclasses.fields(SagaContextData) to filter context.data keys before constructing SagaContextData.
350-378: Minor: redundantself._producerguard at line 372.This method is only called from
cancel_saga(line 309) which already guards onself._producer. The inner check is harmless but redundant.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/saga/saga_orchestrator.py` around lines 350 - 378, Remove the redundant self._producer presence check inside _publish_saga_cancelled_event: since cancel_saga already guards that self._producer exists, eliminate the inner "if self._producer" branch and directly await self._producer.produce(...) in _publish_saga_cancelled_event (keeping the same event construction, metadata, logging, and exception handling). This simplifies the method (SagaCancelledEvent creation and self.logger calls remain unchanged) and relies on the caller cancel_saga to ensure _producer is set.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 303-325: After persisting the CANCELLED state and publishing, wrap
the await self._compensate_saga(saga_id, context) call in a try/except that
catches SagaConcurrencyError and Exception; on exception do not re-raise (so
cancel_saga reports success) but log the failure with full exception details
(use the orchestrator logger, e.g. self._logger.error) and persist a
compensation-failure marker on the saga (call self._repo.save_saga with saga_id
and an error_message or compensation_status field) so compensations can be
retried or inspected; keep the existing flow for save_saga,
_publish_saga_cancelled_event and context preparation unchanged.
- Around line 260-271: The loop in check_timeouts currently lets a failing
_repo.save_saga abort processing remaining sagas; wrap the per-instance work
(the logger.warning and the await self._repo.save_saga(...) call) in a
try/except so one failure doesn't stop the batch—catch SagaConcurrencyError and
other exceptions, log an error via self.logger.error including saga_id and the
exception, and continue to the next instance; keep the same
state/error_message/completed_at values when calling save_saga and ensure
exceptions are handled per-instance so find_timed_out_sagas results are fully
processed.
- Around line 199-207: The current except block in _execute_saga calls
_compensate_saga or _fail_saga but does not guard against exceptions they may
raise, which can bubble up through _execute_saga → _start_saga →
handle_execution_requested; update the except Exception as e handler to wrap the
calls to _compensate_saga(instance.saga_id, context) and
_fail_saga(instance.saga_id, str(e)) in their own try/except blocks (or a single
try that separately logs failures) so any exception raised by those methods is
caught and logged (include saga_id and exc_info) instead of propagating; ensure
you still log the original error before attempting compensation/fail and use the
same logger used in _execute_saga for consistency.
---
Nitpick comments:
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 176-182: Replace the use of the implementation-detail attribute
SagaContextData.__dataclass_fields__ with the public dataclasses API: import
dataclasses.fields and compute the set of valid field names via {f.name for f in
fields(SagaContextData)}, then use that set in the dict comprehension inside the
await self._repo.save_saga(...) call (the block that currently builds
context_data from context.data). This keeps the logic in the method/class
unchanged but uses dataclasses.fields(SagaContextData) to filter context.data
keys before constructing SagaContextData.
- Around line 350-378: Remove the redundant self._producer presence check inside
_publish_saga_cancelled_event: since cancel_saga already guards that
self._producer exists, eliminate the inner "if self._producer" branch and
directly await self._producer.produce(...) in _publish_saga_cancelled_event
(keeping the same event construction, metadata, logging, and exception
handling). This simplifies the method (SagaCancelledEvent creation and
self.logger calls remain unchanged) and relies on the caller cancel_saga to
ensure _producer is set.



Summary by cubic
Updated backend and frontend dependencies, CI scanner, and base image to improve security and runtime compatibility. Fixed flaky saga state updates with Beanie revision tracking and atomic repository updates, and improved notification event timing and orchestrator logging.
Dependencies
Bug Fixes
Written for commit df1f55e. Summary will update on new commits.
Summary by CodeRabbit