Skip to content

fix: saga orch ublishes saga started event, fixing concurr problems in saga routes#148

Merged
HardMax71 merged 2 commits intomainfrom
fix/backend-flaky-tests
Feb 7, 2026
Merged

fix: saga orch ublishes saga started event, fixing concurr problems in saga routes#148
HardMax71 merged 2 commits intomainfrom
fix/backend-flaky-tests

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 7, 2026


Summary by cubic

Publish SagaStartedEvent and NotificationCreatedEvent right after DB persistence to remove race conditions in saga routes and notification flows. E2E tests now wait on these events for deterministic reads.

  • Bug Fixes
    • SagaOrchestrator publishes SagaStartedEvent after persisting (when producer and store_events are enabled).
    • EventWaiter adds wait_for_saga_started and wait_for_notification_created, subscribes to NOTIFICATION_EVENTS, and tightens filters to use e.execution_id.
    • Tests: saga routes wait on SAGA_STARTED; notification tests wait on NOTIFICATION_CREATED instead of RESULT_STORED.

Written for commit 948f71f. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Publish a "saga started" event when a saga begins.
    • Publish a "notification created" event after notifications are persisted.
  • Bug Fixes

    • Improved saga event handling to ensure reliable event publication during saga initialization.
  • Tests

    • Updated end-to-end tests to wait for saga startup and notification-created events for more reliable synchronization.

@coderabbitai
Copy link

coderabbitai bot commented Feb 7, 2026

📝 Walkthrough

Walkthrough

Adds SagaStartedEvent publishing in the saga orchestrator and a NotificationCreatedEvent publish step in the notification service; test helpers and tests updated to wait for saga-start and notification-created events.

Changes

Cohort / File(s) Summary
Saga Orchestrator
backend/app/services/saga/saga_orchestrator.py
Imported SagaStartedEvent; added async def _publish_saga_started_event(self, instance: Saga, trigger_event: ExecutionRequestedEvent) -> None and invoke it after persisting a saga when a producer and event storage are enabled. Includes publish error logging. Note: duplicate helper appears to be introduced in an additional location.
Notification Service
backend/app/services/notification_service.py
Added imports EventMetadata, NotificationCreatedEvent; added internal _publish_notification_created_event(self, notification: DomainNotification) -> None and publishes NotificationCreatedEvent after persisting notifications (logged on failure).
End-to-end Tests
backend/tests/e2e/conftest.py, backend/tests/e2e/test_saga_routes.py
Added EventWaiter.wait_for_saga_started(execution_id, ...) and wait_for_notification_created(...); updated predicates to compare execution_id directly; extended Kafka subscriptions; tests updated to wait for SAGA_STARTED / NOTIFICATION_CREATED instead of previous command/result events.

Sequence Diagram

sequenceDiagram
    participant Client
    participant SagaOrchestrator
    participant SagaStore as "Saga Store"
    participant Producer as "Event Producer"
    participant TestWaiter as "Test EventWaiter"

    Client->>SagaOrchestrator: ExecutionRequestedEvent
    SagaOrchestrator->>SagaStore: Persist saga instance
    SagaStore-->>SagaOrchestrator: Persisted confirmation
    SagaOrchestrator->>Producer: Publish SagaStartedEvent
    Producer-->>SagaOrchestrator: Acknowledgement (or error)
    Producer->>TestWaiter: SagaStartedEvent (consumed by tests)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 I hopped awake to a saga's start,
I nudged an event to do its part,
Notifications chimed, the bus took flight,
Tests listened close into the night,
A rabbit's cheer for events done right 🥕✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Title check ⚠️ Warning The title contains a typo ('ublishes' instead of 'publishes') and uses abbreviated terms ('concurr' instead of 'concurrency'), making it unclear and unprofessional despite relating to the actual changes. Correct the typo and abbreviation: 'fix: saga orchestrator publishes saga started event, fixing concurrency problems in saga routes' or similar clear phrasing.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 93.75% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/backend-flaky-tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/tests/e2e/conftest.py (1)

244-246: ⚠️ Potential issue | 🟡 Minor

Stale assertion message — still references CREATE_POD_COMMAND instead of SAGA_STARTED.

The fixture now waits on SAGA_STARTED (line 241), but the assertion failure message on line 245 still says "despite CREATE_POD_COMMAND received".

🐛 Proposed fix
     assert doc is not None, (
-        f"No saga document for {created_execution.execution_id} despite CREATE_POD_COMMAND received"
+        f"No saga document for {created_execution.execution_id} despite SAGA_STARTED received"
     )
🧹 Nitpick comments (2)
backend/app/services/saga/saga_orchestrator.py (1)

316-336: Error swallowed silently — consider whether callers need to know.

The except Exception on line 335 logs but does not re-raise, which is intentional (publish failure shouldn't block saga execution). This is consistent with _publish_saga_cancelled_event. However, the log level is error without exc_info=True, so the traceback is lost.

🔧 Optional: preserve traceback for debugging
         except Exception as e:
-            self.logger.error(f"Failed to publish saga started event: {e}")
+            self.logger.error(f"Failed to publish saga started event: {e}", exc_info=True)
backend/tests/e2e/conftest.py (1)

105-113: Missing type: ignore on e.execution_id — inconsistent with wait_for_result.

Line 100 uses # type: ignore[union-attr] for the same e.execution_id access pattern, but line 110 omits it. If mypy is enforced, this will produce a type error for the same reason.

🔧 Suggested fix
         return await self.wait_for(
             lambda e: (
                 e.event_type == EventType.CREATE_POD_COMMAND
-                and e.execution_id == execution_id
+                and e.execution_id == execution_id  # type: ignore[union-attr]
             ),
             timeout=timeout,
         )

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 3 files

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 7, 2026

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/tests/e2e/conftest.py (1)

255-258: ⚠️ Potential issue | 🟡 Minor

Stale assertion message still references CREATE_POD_COMMAND.

Line 257 says "despite CREATE_POD_COMMAND received" but the fixture now waits on SAGA_STARTED. Update the message for consistency.

Proposed fix
     doc = await SagaDocument.find_one(SagaDocument.execution_id == created_execution.execution_id)
     assert doc is not None, (
-        f"No saga document for {created_execution.execution_id} despite CREATE_POD_COMMAND received"
+        f"No saga document for {created_execution.execution_id} despite SAGA_STARTED received"
     )
🧹 Nitpick comments (2)
backend/tests/e2e/conftest.py (2)

125-134: e.tags access may raise AttributeError if a non-notification event is evaluated.

DomainEvent is a union type; not all variants have a tags attribute. If a non-NOTIFICATION_CREATED event somehow passes the first check (unlikely due to short-circuit and), or if the union discriminator doesn't narrow the type, the bare e.tags access could fail. In practice the event_type guard protects this at runtime, but consider adding hasattr / getattr for safety and add a # type: ignore[union-attr] comment for consistency with line 100.


95-123: Consistent type-ignore comments across predicate lambdas.

wait_for_result (line 100) has # type: ignore[union-attr] on the e.execution_id access, but wait_for_saga_command (line 110) and wait_for_saga_started (line 120) access e.execution_id without it. Consider adding the annotation uniformly to suppress mypy warnings.

@HardMax71 HardMax71 merged commit 6c11f33 into main Feb 7, 2026
21 checks passed
@HardMax71 HardMax71 deleted the fix/backend-flaky-tests branch February 7, 2026 23:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant