diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index 5fd6f97d..b431dd01 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -17,9 +17,11 @@ ) from app.domain.enums.user import UserRole from app.domain.events.typed import ( + EventMetadata, ExecutionCompletedEvent, ExecutionFailedEvent, ExecutionTimeoutEvent, + NotificationCreatedEvent, ) from app.domain.notification import ( DomainNotification, @@ -187,6 +189,8 @@ async def create_notification( # Save to database notification = await self.repository.create_notification(create_data) + await self._publish_notification_created_event(notification) + # Deliver immediately if not scheduled; scheduled notifications are # picked up by the NotificationScheduler worker. if scheduled_for is None: @@ -194,6 +198,27 @@ async def create_notification( return notification + async def _publish_notification_created_event(self, notification: DomainNotification) -> None: + """Publish NotificationCreatedEvent after the notification is persisted.""" + try: + event = NotificationCreatedEvent( + notification_id=notification.notification_id, + user_id=notification.user_id, + subject=notification.subject, + body=notification.body, + severity=notification.severity, + tags=list(notification.tags or []), + channels=[notification.channel], + metadata=EventMetadata( + service_name=self.settings.SERVICE_NAME, + service_version=self.settings.SERVICE_VERSION, + user_id=notification.user_id, + ), + ) + await self.event_service.publish_domain_event(event, key=notification.user_id) + except Exception as e: + self.logger.error(f"Failed to publish notification created event: {e}") + async def create_system_notification( self, title: str, diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py index 0f6aa061..5fead192 100644 --- a/backend/app/services/saga/saga_orchestrator.py +++ b/backend/app/services/saga/saga_orchestrator.py @@ -17,6 +17,7 @@ ExecutionRequestedEvent, ExecutionTimeoutEvent, SagaCancelledEvent, + SagaStartedEvent, ) from app.domain.saga.models import Saga, SagaConfig from app.events.core import UnifiedProducer @@ -110,6 +111,9 @@ async def _start_saga(self, trigger_event: ExecutionRequestedEvent) -> str: self.logger.info(f"Started saga {_SAGA_NAME} (ID: {instance.saga_id}) for execution {execution_id}") + if self._producer and self.config.store_events: + await self._publish_saga_started_event(instance, trigger_event) + saga = self._create_saga_instance() context = SagaContext(instance.saga_id, execution_id) context.set("correlation_id", trigger_event.metadata.correlation_id) @@ -309,6 +313,28 @@ async def cancel_saga(self, saga_id: str) -> bool: ) return False + async def _publish_saga_started_event( + self, instance: Saga, trigger_event: ExecutionRequestedEvent, + ) -> None: + """Publish saga started event after the document is persisted.""" + try: + event = SagaStartedEvent( + saga_id=instance.saga_id, + saga_name=instance.saga_name, + execution_id=instance.execution_id, + initial_event_id=trigger_event.event_id, + metadata=EventMetadata( + service_name="saga-orchestrator", + service_version="1.0.0", + user_id=trigger_event.metadata.user_id or "system", + correlation_id=trigger_event.metadata.correlation_id, + ), + ) + await self._producer.produce(event_to_produce=event, key=instance.execution_id) + self.logger.info(f"Published SagaStartedEvent for saga {instance.saga_id}") + except Exception as e: + self.logger.error(f"Failed to publish saga started event: {e}") + async def _publish_saga_cancelled_event(self, saga_instance: Saga) -> None: """Publish saga cancelled event.""" try: diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index 0d76bac3..d0d767fb 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -97,17 +97,38 @@ async def wait_for_result(self, execution_id: str, timeout: float = 30.0) -> Dom return await self.wait_for( lambda e: ( e.event_type in RESULT_EVENT_TYPES - and getattr(e, "execution_id", None) == execution_id + and e.execution_id == execution_id # type: ignore[union-attr] ), timeout=timeout, ) async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: - """Wait for CREATE_POD_COMMAND — saga is guaranteed in MongoDB after this.""" + """Wait for CREATE_POD_COMMAND for *execution_id*.""" return await self.wait_for( lambda e: ( e.event_type == EventType.CREATE_POD_COMMAND - and getattr(e, "execution_id", None) == execution_id + and e.execution_id == execution_id + ), + timeout=timeout, + ) + + async def wait_for_saga_started(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: + """Wait for SAGA_STARTED — saga document is guaranteed in MongoDB after this.""" + return await self.wait_for( + lambda e: ( + e.event_type == EventType.SAGA_STARTED + and e.execution_id == execution_id + ), + timeout=timeout, + ) + + async def wait_for_notification_created(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: + """Wait for NOTIFICATION_CREATED — notification is guaranteed in MongoDB after this.""" + exec_tag = f"exec:{execution_id}" + return await self.wait_for( + lambda e: ( + e.event_type == EventType.NOTIFICATION_CREATED + and exec_tag in e.tags ), timeout=timeout, ) @@ -122,6 +143,7 @@ async def event_waiter(test_settings: Settings) -> AsyncGenerator[EventWaiter, N f"{prefix}{KafkaTopic.EXECUTION_RESULTS}", f"{prefix}{KafkaTopic.SAGA_EVENTS}", f"{prefix}{KafkaTopic.SAGA_COMMANDS}", + f"{prefix}{KafkaTopic.NOTIFICATION_EVENTS}", ] waiter = EventWaiter(test_settings.KAFKA_BOOTSTRAP_SERVERS, topics) await waiter.start() @@ -221,15 +243,14 @@ async def execution_with_saga( event_waiter: EventWaiter, created_execution: ExecutionResponse, ) -> tuple[ExecutionResponse, SagaStatusResponse]: - """Execution with saga guaranteed in MongoDB (via CREATE_POD_COMMAND event). + """Execution with saga guaranteed in MongoDB (via SAGA_STARTED event). - The saga orchestrator persists the saga document multiple times before - publishing CREATE_POD_COMMAND to Kafka. Once EventWaiter resolves the - command, the document is definitively in MongoDB. We query Beanie - directly (same DB, no HTTP round-trip) for a deterministic, sleep-free - lookup. + The saga orchestrator publishes SAGA_STARTED after persisting the saga + document to MongoDB. Once EventWaiter resolves the event, the document + is definitively in MongoDB. We query Beanie directly (same DB, no HTTP + round-trip) for a deterministic, sleep-free lookup. """ - await event_waiter.wait_for_saga_command(created_execution.execution_id) + await event_waiter.wait_for_saga_started(created_execution.execution_id) doc = await SagaDocument.find_one(SagaDocument.execution_id == created_execution.execution_id) assert doc is not None, ( @@ -247,17 +268,17 @@ async def execution_with_notification( event_waiter: EventWaiter, created_execution: ExecutionResponse, ) -> tuple[ExecutionResponse, NotificationResponse]: - """Execution with notification guaranteed in MongoDB. + """Execution with notification guaranteed in MongoDB (via NOTIFICATION_CREATED event). - Notification handler runs in-process and finishes before the external - result processor produces RESULT_STORED — so waiting for RESULT_STORED - guarantees the notification exists. + The notification service publishes NOTIFICATION_CREATED after persisting + the notification to MongoDB. Once EventWaiter resolves the event, the + document is definitively in MongoDB. """ - await event_waiter.wait_for_result(created_execution.execution_id) + await event_waiter.wait_for_notification_created(created_execution.execution_id) resp = await test_user.get("/api/v1/notifications", params={"limit": 10}) assert resp.status_code == 200 result = NotificationListResponse.model_validate(resp.json()) - assert result.notifications, "No notification despite RESULT_STORED received" + assert result.notifications, "No notification despite NOTIFICATION_CREATED received" notification = result.notifications[0] assert created_execution.execution_id in (notification.subject + " ".join(notification.tags)) return created_execution, notification diff --git a/backend/tests/e2e/test_saga_routes.py b/backend/tests/e2e/test_saga_routes.py index c0767f6b..980477ed 100644 --- a/backend/tests/e2e/test_saga_routes.py +++ b/backend/tests/e2e/test_saga_routes.py @@ -201,7 +201,7 @@ async def test_cancel_saga( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) - await event_waiter.wait_for_saga_command(execution.execution_id) + await event_waiter.wait_for_saga_started(execution.execution_id) saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] @@ -246,7 +246,7 @@ async def test_cancel_other_users_saga_forbidden( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) - await event_waiter.wait_for_saga_command(execution.execution_id) + await event_waiter.wait_for_saga_started(execution.execution_id) saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0]