Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
)
from app.domain.enums.user import UserRole
from app.domain.events.typed import (
EventMetadata,
ExecutionCompletedEvent,
ExecutionFailedEvent,
ExecutionTimeoutEvent,
NotificationCreatedEvent,
)
from app.domain.notification import (
DomainNotification,
Expand Down Expand Up @@ -187,13 +189,36 @@ async def create_notification(
# Save to database
notification = await self.repository.create_notification(create_data)

await self._publish_notification_created_event(notification)

# Deliver immediately if not scheduled; scheduled notifications are
# picked up by the NotificationScheduler worker.
if scheduled_for is None:
await self._deliver_notification(notification)

return notification

async def _publish_notification_created_event(self, notification: DomainNotification) -> None:
"""Publish NotificationCreatedEvent after the notification is persisted."""
try:
event = NotificationCreatedEvent(
notification_id=notification.notification_id,
user_id=notification.user_id,
subject=notification.subject,
body=notification.body,
severity=notification.severity,
tags=list(notification.tags or []),
channels=[notification.channel],
metadata=EventMetadata(
service_name=self.settings.SERVICE_NAME,
service_version=self.settings.SERVICE_VERSION,
user_id=notification.user_id,
),
)
await self.event_service.publish_domain_event(event, key=notification.user_id)
except Exception as e:
self.logger.error(f"Failed to publish notification created event: {e}")

async def create_system_notification(
self,
title: str,
Expand Down
26 changes: 26 additions & 0 deletions backend/app/services/saga/saga_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ExecutionRequestedEvent,
ExecutionTimeoutEvent,
SagaCancelledEvent,
SagaStartedEvent,
)
from app.domain.saga.models import Saga, SagaConfig
from app.events.core import UnifiedProducer
Expand Down Expand Up @@ -110,6 +111,9 @@ async def _start_saga(self, trigger_event: ExecutionRequestedEvent) -> str:

self.logger.info(f"Started saga {_SAGA_NAME} (ID: {instance.saga_id}) for execution {execution_id}")

if self._producer and self.config.store_events:
await self._publish_saga_started_event(instance, trigger_event)

saga = self._create_saga_instance()
context = SagaContext(instance.saga_id, execution_id)
context.set("correlation_id", trigger_event.metadata.correlation_id)
Expand Down Expand Up @@ -309,6 +313,28 @@ async def cancel_saga(self, saga_id: str) -> bool:
)
return False

async def _publish_saga_started_event(
self, instance: Saga, trigger_event: ExecutionRequestedEvent,
) -> None:
"""Publish saga started event after the document is persisted."""
try:
event = SagaStartedEvent(
saga_id=instance.saga_id,
saga_name=instance.saga_name,
execution_id=instance.execution_id,
initial_event_id=trigger_event.event_id,
metadata=EventMetadata(
service_name="saga-orchestrator",
service_version="1.0.0",
user_id=trigger_event.metadata.user_id or "system",
correlation_id=trigger_event.metadata.correlation_id,
),
)
await self._producer.produce(event_to_produce=event, key=instance.execution_id)
self.logger.info(f"Published SagaStartedEvent for saga {instance.saga_id}")
except Exception as e:
self.logger.error(f"Failed to publish saga started event: {e}")

async def _publish_saga_cancelled_event(self, saga_instance: Saga) -> None:
"""Publish saga cancelled event."""
try:
Expand Down
53 changes: 37 additions & 16 deletions backend/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,38 @@ async def wait_for_result(self, execution_id: str, timeout: float = 30.0) -> Dom
return await self.wait_for(
lambda e: (
e.event_type in RESULT_EVENT_TYPES
and getattr(e, "execution_id", None) == execution_id
and e.execution_id == execution_id # type: ignore[union-attr]
),
timeout=timeout,
)

async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
"""Wait for CREATE_POD_COMMAND — saga is guaranteed in MongoDB after this."""
"""Wait for CREATE_POD_COMMAND for *execution_id*."""
return await self.wait_for(
lambda e: (
e.event_type == EventType.CREATE_POD_COMMAND
and getattr(e, "execution_id", None) == execution_id
and e.execution_id == execution_id
),
timeout=timeout,
)

async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
"""Wait for SAGA_STARTED — saga document is guaranteed in MongoDB after this."""
return await self.wait_for(
lambda e: (
e.event_type == EventType.SAGA_STARTED
and e.execution_id == execution_id
),
timeout=timeout,
)

async def wait_for_notification_created(self, execution_id: str, timeout: float = 15.0) -> DomainEvent:
"""Wait for NOTIFICATION_CREATED — notification is guaranteed in MongoDB after this."""
exec_tag = f"exec:{execution_id}"
return await self.wait_for(
lambda e: (
e.event_type == EventType.NOTIFICATION_CREATED
and exec_tag in e.tags
),
timeout=timeout,
)
Expand All @@ -122,6 +143,7 @@ async def event_waiter(test_settings: Settings) -> AsyncGenerator[EventWaiter, N
f"{prefix}{KafkaTopic.EXECUTION_RESULTS}",
f"{prefix}{KafkaTopic.SAGA_EVENTS}",
f"{prefix}{KafkaTopic.SAGA_COMMANDS}",
f"{prefix}{KafkaTopic.NOTIFICATION_EVENTS}",
]
waiter = EventWaiter(test_settings.KAFKA_BOOTSTRAP_SERVERS, topics)
await waiter.start()
Expand Down Expand Up @@ -221,15 +243,14 @@ async def execution_with_saga(
event_waiter: EventWaiter,
created_execution: ExecutionResponse,
) -> tuple[ExecutionResponse, SagaStatusResponse]:
"""Execution with saga guaranteed in MongoDB (via CREATE_POD_COMMAND event).
"""Execution with saga guaranteed in MongoDB (via SAGA_STARTED event).

The saga orchestrator persists the saga document multiple times before
publishing CREATE_POD_COMMAND to Kafka. Once EventWaiter resolves the
command, the document is definitively in MongoDB. We query Beanie
directly (same DB, no HTTP round-trip) for a deterministic, sleep-free
lookup.
The saga orchestrator publishes SAGA_STARTED after persisting the saga
document to MongoDB. Once EventWaiter resolves the event, the document
is definitively in MongoDB. We query Beanie directly (same DB, no HTTP
round-trip) for a deterministic, sleep-free lookup.
"""
await event_waiter.wait_for_saga_command(created_execution.execution_id)
await event_waiter.wait_for_saga_started(created_execution.execution_id)

doc = await SagaDocument.find_one(SagaDocument.execution_id == created_execution.execution_id)
assert doc is not None, (
Expand All @@ -247,17 +268,17 @@ async def execution_with_notification(
event_waiter: EventWaiter,
created_execution: ExecutionResponse,
) -> tuple[ExecutionResponse, NotificationResponse]:
"""Execution with notification guaranteed in MongoDB.
"""Execution with notification guaranteed in MongoDB (via NOTIFICATION_CREATED event).

Notification handler runs in-process and finishes before the external
result processor produces RESULT_STORED — so waiting for RESULT_STORED
guarantees the notification exists.
The notification service publishes NOTIFICATION_CREATED after persisting
the notification to MongoDB. Once EventWaiter resolves the event, the
document is definitively in MongoDB.
"""
await event_waiter.wait_for_result(created_execution.execution_id)
await event_waiter.wait_for_notification_created(created_execution.execution_id)
resp = await test_user.get("/api/v1/notifications", params={"limit": 10})
assert resp.status_code == 200
result = NotificationListResponse.model_validate(resp.json())
assert result.notifications, "No notification despite RESULT_STORED received"
assert result.notifications, "No notification despite NOTIFICATION_CREATED received"
notification = result.notifications[0]
assert created_execution.execution_id in (notification.subject + " ".join(notification.tags))
return created_execution, notification
4 changes: 2 additions & 2 deletions backend/tests/e2e/test_saga_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def test_cancel_saga(
assert exec_response.status_code == 200

execution = ExecutionResponse.model_validate(exec_response.json())
await event_waiter.wait_for_saga_command(execution.execution_id)
await event_waiter.wait_for_saga_started(execution.execution_id)

saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}")
saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]
Expand Down Expand Up @@ -246,7 +246,7 @@ async def test_cancel_other_users_saga_forbidden(
assert exec_response.status_code == 200

execution = ExecutionResponse.model_validate(exec_response.json())
await event_waiter.wait_for_saga_command(execution.execution_id)
await event_waiter.wait_for_saga_started(execution.execution_id)

saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}")
saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]
Expand Down
Loading