From 5049a927bae8a8538a46eabddf273c62efe57ad6 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Fri, 6 Feb 2026 23:16:32 +0100 Subject: [PATCH 1/3] fix: removed duplication, better tests, also added sse tests --- backend/tests/e2e/conftest.py | 163 +++++++- backend/tests/e2e/dlq/test_dlq_manager.py | 2 +- backend/tests/e2e/test_admin_events_routes.py | 14 +- backend/tests/e2e/test_admin_routes.py | 364 ------------------ .../tests/e2e/test_admin_settings_routes.py | 9 - backend/tests/e2e/test_admin_users_routes.py | 29 +- backend/tests/e2e/test_alertmanager.py | 41 -- backend/tests/e2e/test_auth_routes.py | 12 +- backend/tests/e2e/test_dlq_routes.py | 165 +++----- backend/tests/e2e/test_events_routes.py | 148 +++---- backend/tests/e2e/test_execution_routes.py | 242 +++++------- .../tests/e2e/test_grafana_alerts_routes.py | 8 +- backend/tests/e2e/test_health_routes.py | 3 +- .../tests/e2e/test_notifications_routes.py | 118 +++--- backend/tests/e2e/test_replay_routes.py | 7 +- backend/tests/e2e/test_saga_routes.py | 133 +++---- backend/tests/e2e/test_sse_routes.py | 131 ++++++- .../tests/e2e/test_user_settings_routes.py | 53 +-- 18 files changed, 642 insertions(+), 1000 deletions(-) delete mode 100644 backend/tests/e2e/test_admin_routes.py delete mode 100644 backend/tests/e2e/test_alertmanager.py diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index 29b7a9e9..b4d60cdb 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -1,14 +1,132 @@ +import asyncio +import json +import logging import uuid +from collections.abc import AsyncGenerator, Callable +from contextlib import suppress import pytest import pytest_asyncio +from aiokafka import AIOKafkaConsumer +from app.domain.enums.events import EventType +from app.domain.enums.kafka import KafkaTopic from app.domain.enums.user import UserRole +from app.domain.events.typed import DomainEvent, DomainEventAdapter from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse +from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse +from app.schemas_pydantic.saga import SagaListResponse, SagaStatusResponse from app.schemas_pydantic.saved_script import SavedScriptCreateRequest from app.schemas_pydantic.user import UserCreate +from app.settings import Settings from httpx import AsyncClient -# --- Request fixtures --- +_logger = logging.getLogger("test.event_waiter") + +# Event types that indicate execution result is stored in MongoDB +RESULT_EVENT_TYPES = frozenset({EventType.RESULT_STORED, EventType.RESULT_FAILED}) + + +class EventWaiter: + """Async Kafka consumer that resolves futures when matching events arrive. + + Session-scoped: one consumer shared by all tests. Events are buffered so + a predicate registered after an event was consumed still matches it. + """ + + def __init__(self, bootstrap_servers: str, topics: list[str]) -> None: + self._waiters: list[tuple[Callable[[DomainEvent], bool], asyncio.Future[DomainEvent]]] = [] + self._buffer: list[DomainEvent] = [] + self._consumer = AIOKafkaConsumer( + *topics, + bootstrap_servers=bootstrap_servers, + group_id=f"test-event-waiter-{uuid.uuid4().hex[:6]}", + auto_offset_reset="latest", + enable_auto_commit=True, + ) + self._task: asyncio.Task[None] | None = None + + async def start(self) -> None: + await self._consumer.start() + # Wait for partition assignment so no events are missed + while not self._consumer.assignment(): + await asyncio.sleep(0.05) + self._task = asyncio.create_task(self._consume_loop()) + + async def stop(self) -> None: + if self._task: + self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task + await self._consumer.stop() + + async def _consume_loop(self) -> None: + async for msg in self._consumer: + try: + payload = json.loads(msg.value.decode()) + event = DomainEventAdapter.validate_python(payload) + except Exception: + continue + self._buffer.append(event) + for predicate, future in list(self._waiters): + if not future.done() and predicate(event): + future.set_result(event) + + async def wait_for( + self, + predicate: Callable[[DomainEvent], bool], + timeout: float = 15.0, + ) -> DomainEvent: + """Wait for a Kafka event matching predicate. No polling — pure async.""" + # Check buffer first (event may have arrived before this call) + for event in self._buffer: + if predicate(event): + return event + # Not in buffer — register waiter and await + future: asyncio.Future[DomainEvent] = asyncio.get_running_loop().create_future() + entry = (predicate, future) + self._waiters.append(entry) + try: + return await asyncio.wait_for(future, timeout=timeout) + finally: + if entry in self._waiters: + self._waiters.remove(entry) + + async def wait_for_result(self, execution_id: str, timeout: float = 30.0) -> DomainEvent: + """Wait for RESULT_STORED or RESULT_FAILED for *execution_id*.""" + return await self.wait_for( + lambda e: ( + e.event_type in RESULT_EVENT_TYPES + and getattr(e, "execution_id", None) == execution_id + ), + timeout=timeout, + ) + + async def wait_for_saga_command(self, execution_id: str, timeout: float = 15.0) -> DomainEvent: + """Wait for CREATE_POD_COMMAND — saga is guaranteed in MongoDB after this.""" + return await self.wait_for( + lambda e: ( + e.event_type == EventType.CREATE_POD_COMMAND + and getattr(e, "execution_id", None) == execution_id + ), + timeout=timeout, + ) + + +@pytest_asyncio.fixture(scope="session") +async def event_waiter(test_settings: Settings) -> AsyncGenerator[EventWaiter, None]: + """Session-scoped Kafka event waiter. Starts before any test produces events.""" + prefix = test_settings.KAFKA_TOPIC_PREFIX + topics = [ + f"{prefix}{KafkaTopic.EXECUTION_EVENTS}", + f"{prefix}{KafkaTopic.EXECUTION_RESULTS}", + f"{prefix}{KafkaTopic.SAGA_EVENTS}", + f"{prefix}{KafkaTopic.SAGA_COMMANDS}", + ] + waiter = EventWaiter(test_settings.KAFKA_BOOTSTRAP_SERVERS, topics) + await waiter.start() + _logger.info("EventWaiter started on %s", topics) + yield waiter + await waiter.stop() @pytest.fixture @@ -73,14 +191,11 @@ def new_script_request() -> SavedScriptCreateRequest: ) -# --- Created resource fixtures --- - - @pytest_asyncio.fixture async def created_execution( test_user: AsyncClient, simple_execution_request: ExecutionRequest ) -> ExecutionResponse: - """Execution created by test_user.""" + """Execution created by test_user (does NOT wait for completion).""" resp = await test_user.post( "/api/v1/execute", json=simple_execution_request.model_dump() ) @@ -98,3 +213,41 @@ async def created_execution_admin( ) assert resp.status_code == 200 return ExecutionResponse.model_validate(resp.json()) + + +@pytest_asyncio.fixture +async def execution_with_saga( + test_user: AsyncClient, + event_waiter: EventWaiter, + created_execution: ExecutionResponse, +) -> tuple[ExecutionResponse, SagaStatusResponse]: + """Execution with saga guaranteed in MongoDB (via CREATE_POD_COMMAND event).""" + await event_waiter.wait_for_saga_command(created_execution.execution_id) + resp = await test_user.get(f"/api/v1/sagas/execution/{created_execution.execution_id}") + assert resp.status_code == 200 + result = SagaListResponse.model_validate(resp.json()) + assert result.sagas, f"No saga for {created_execution.execution_id} despite CREATE_POD_COMMAND received" + assert result.sagas[0].execution_id == created_execution.execution_id + return created_execution, result.sagas[0] + + +@pytest_asyncio.fixture +async def execution_with_notification( + test_user: AsyncClient, + event_waiter: EventWaiter, + created_execution: ExecutionResponse, +) -> tuple[ExecutionResponse, NotificationResponse]: + """Execution with notification guaranteed in MongoDB. + + Notification handler runs in-process and finishes before the external + result processor produces RESULT_STORED — so waiting for RESULT_STORED + guarantees the notification exists. + """ + await event_waiter.wait_for_result(created_execution.execution_id) + resp = await test_user.get("/api/v1/notifications", params={"limit": 10}) + assert resp.status_code == 200 + result = NotificationListResponse.model_validate(resp.json()) + assert result.notifications, "No notification despite RESULT_STORED received" + notification = result.notifications[0] + assert created_execution.execution_id in (notification.subject + " ".join(notification.tags)) + return created_execution, notification diff --git a/backend/tests/e2e/dlq/test_dlq_manager.py b/backend/tests/e2e/dlq/test_dlq_manager.py index 631faaa7..9bc5ab35 100644 --- a/backend/tests/e2e/dlq/test_dlq_manager.py +++ b/backend/tests/e2e/dlq/test_dlq_manager.py @@ -97,7 +97,7 @@ async def consume_dlq_events() -> None: await manager.handle_message(dlq_msg) # Await the DLQMessageReceivedEvent — true async, no polling - received = await asyncio.wait_for(received_future, timeout=15.0) + received = await asyncio.wait_for(received_future, timeout=5.0) assert received.dlq_event_id == ev.event_id assert received.event_type == EventType.DLQ_MESSAGE_RECEIVED assert received.original_event_type == str(EventType.EXECUTION_REQUESTED) diff --git a/backend/tests/e2e/test_admin_events_routes.py b/backend/tests/e2e/test_admin_events_routes.py index 90642b33..3413d032 100644 --- a/backend/tests/e2e/test_admin_events_routes.py +++ b/backend/tests/e2e/test_admin_events_routes.py @@ -57,7 +57,6 @@ async def test_browse_events(self, test_admin: AsyncClient) -> None: assert result.total >= 0 assert result.skip == 0 assert result.limit == 50 - assert isinstance(result.events, list) @pytest.mark.asyncio async def test_browse_events_with_event_type_filter( @@ -78,7 +77,6 @@ async def test_browse_events_with_event_type_filter( assert response.status_code == 200 result = EventBrowseResponse.model_validate(response.json()) - assert isinstance(result.events, list) assert result.total >= 1 @pytest.mark.asyncio @@ -132,8 +130,7 @@ async def test_browse_events_with_search_text( ) assert response.status_code == 200 - result = EventBrowseResponse.model_validate(response.json()) - assert isinstance(result.events, list) + EventBrowseResponse.model_validate(response.json()) @pytest.mark.asyncio async def test_browse_events_forbidden_for_regular_user( @@ -172,9 +169,6 @@ async def test_get_event_stats(self, test_admin: AsyncClient) -> None: stats = EventStatsResponse.model_validate(response.json()) assert stats.total_events >= 0 - assert isinstance(stats.events_by_type, list) - assert isinstance(stats.events_by_hour, list) - assert isinstance(stats.top_users, list) assert stats.error_rate >= 0.0 assert stats.avg_processing_time >= 0.0 @@ -203,8 +197,7 @@ async def test_get_event_stats_max_hours( ) assert response.status_code == 200 - stats = EventStatsResponse.model_validate(response.json()) - assert isinstance(stats.events_by_hour, list) + EventStatsResponse.model_validate(response.json()) @pytest.mark.asyncio async def test_get_event_stats_forbidden_for_regular_user( @@ -278,7 +271,6 @@ async def test_export_events_json(self, test_admin: AsyncClient) -> None: data = response.json() assert "export_metadata" in data assert "events" in data - assert isinstance(data["events"], list) assert "exported_at" in data["export_metadata"] @pytest.mark.asyncio @@ -320,8 +312,6 @@ async def test_get_event_detail( detail = EventDetailResponse.model_validate(response.json()) assert detail.event is not None - assert isinstance(detail.related_events, list) - assert isinstance(detail.timeline, list) @pytest.mark.asyncio async def test_get_event_detail_not_found( diff --git a/backend/tests/e2e/test_admin_routes.py b/backend/tests/e2e/test_admin_routes.py deleted file mode 100644 index c671232e..00000000 --- a/backend/tests/e2e/test_admin_routes.py +++ /dev/null @@ -1,364 +0,0 @@ -from uuid import uuid4 - -import pytest -from app.domain.enums.events import EventType -from app.schemas_pydantic.admin_settings import ( - ExecutionLimitsSchema, - MonitoringSettingsSchema, - SecuritySettingsSchema, - SystemSettings, -) -from app.schemas_pydantic.admin_user_overview import AdminUserOverview -from httpx import AsyncClient - - -@pytest.mark.e2e -class TestAdminSettings: - """Test admin settings endpoints against real backend.""" - - @pytest.mark.asyncio - async def test_get_settings_requires_auth(self, client: AsyncClient) -> None: - """Test that admin settings require authentication.""" - response = await client.get("/api/v1/admin/settings/") - assert response.status_code == 401 - - error = response.json() - assert "detail" in error - assert "not authenticated" in error["detail"].lower() or "unauthorized" in error["detail"].lower() - - @pytest.mark.asyncio - async def test_get_settings_with_admin_auth(self, test_admin: AsyncClient) -> None: - """Test getting system settings with admin authentication.""" - # Get settings with auth cookie (logged in via test_admin fixture) - response = await test_admin.get("/api/v1/admin/settings/") - assert response.status_code == 200 - - # Validate response structure - data = response.json() - settings = SystemSettings(**data) - - # Verify all nested structures - assert settings.execution_limits is not None - assert isinstance(settings.execution_limits, ExecutionLimitsSchema) - assert settings.execution_limits.max_timeout_seconds == 300 # Default value - assert settings.execution_limits.max_memory_mb == 512 - assert settings.execution_limits.max_cpu_cores == 2 - assert settings.execution_limits.max_concurrent_executions == 10 - - assert settings.security_settings is not None - assert isinstance(settings.security_settings, SecuritySettingsSchema) - assert settings.security_settings.password_min_length == 8 - assert settings.security_settings.session_timeout_minutes == 60 - assert settings.security_settings.max_login_attempts == 5 - assert settings.security_settings.lockout_duration_minutes == 15 - - assert settings.monitoring_settings is not None - assert isinstance(settings.monitoring_settings, MonitoringSettingsSchema) - assert settings.monitoring_settings.metrics_retention_days == 30 - assert settings.monitoring_settings.log_level == "INFO" - assert settings.monitoring_settings.enable_tracing is True - assert settings.monitoring_settings.sampling_rate == 0.1 - - @pytest.mark.asyncio - async def test_update_and_reset_settings(self, test_admin: AsyncClient) -> None: - """Test updating and resetting system settings.""" - # Get original settings - original_response = await test_admin.get("/api/v1/admin/settings/") - assert original_response.status_code == 200 - - # Update settings - updated_settings = { - "execution_limits": { - "max_timeout_seconds": 600, - "max_memory_mb": 1024, - "max_cpu_cores": 4, - "max_concurrent_executions": 20 - }, - "security_settings": { - "password_min_length": 10, - "session_timeout_minutes": 120, - "max_login_attempts": 3, - "lockout_duration_minutes": 30 - }, - "monitoring_settings": { - "metrics_retention_days": 60, - "log_level": "WARNING", - "enable_tracing": False, - "sampling_rate": 0.5 - } - } - - update_response = await test_admin.put( - "/api/v1/admin/settings/", json=updated_settings - ) - assert update_response.status_code == 200 - - # Verify updates were applied - returned_settings = SystemSettings(**update_response.json()) - assert returned_settings.execution_limits.max_timeout_seconds == 600 - assert returned_settings.security_settings.password_min_length == 10 - assert returned_settings.monitoring_settings.log_level == "WARNING" - - # Reset settings - reset_response = await test_admin.post("/api/v1/admin/settings/reset") - assert reset_response.status_code == 200 - - # Verify reset to defaults - reset_settings = SystemSettings(**reset_response.json()) - assert reset_settings.execution_limits.max_timeout_seconds == 300 # Back to default - assert reset_settings.security_settings.password_min_length == 8 - assert reset_settings.monitoring_settings.log_level == "INFO" - - @pytest.mark.asyncio - async def test_regular_user_cannot_access_settings(self, test_user: AsyncClient) -> None: - """Test that regular users cannot access admin settings.""" - # Try to access admin settings (logged in as regular user via test_user fixture) - response = await test_user.get("/api/v1/admin/settings/") - assert response.status_code == 403 - - error = response.json() - assert "detail" in error - assert "admin" in error["detail"].lower() or "forbidden" in error["detail"].lower() - - -@pytest.mark.e2e -class TestAdminUsers: - """Test admin user management endpoints against real backend.""" - - @pytest.mark.asyncio - async def test_list_users_with_pagination(self, test_admin: AsyncClient) -> None: - """Test listing users with pagination.""" - # List users - response = await test_admin.get("/api/v1/admin/users/?limit=10&offset=0") - assert response.status_code == 200 - - data = response.json() - assert "users" in data - assert "total" in data - # API returns limit/offset, not page/page_size - assert "limit" in data - assert "offset" in data - - # Verify pagination logic - assert data["limit"] == 10 - assert data["offset"] == 0 - assert isinstance(data["users"], list) - assert data["total"] >= 1 # At least the admin user exists - - # Check user structure - if data["users"]: - user = data["users"][0] - assert "user_id" in user - assert "username" in user - assert "email" in user - assert "role" in user - assert "is_active" in user - assert "created_at" in user - assert "updated_at" in user - - @pytest.mark.asyncio - async def test_create_and_manage_user(self, test_admin: AsyncClient) -> None: - """Test full user CRUD operations.""" - # Create a new user - unique_id = str(uuid4())[:8] - new_user_data = { - "username": f"test_managed_user_{unique_id}", - "email": f"managed_{unique_id}@example.com", - "password": "SecureP@ssw0rd123" - } - - create_response = await test_admin.post("/api/v1/admin/users/", json=new_user_data) - assert create_response.status_code in [200, 201] - - created_user = create_response.json() - assert created_user["username"] == new_user_data["username"] - assert created_user["email"] == new_user_data["email"] - assert "password" not in created_user - assert "hashed_password" not in created_user - - user_id = created_user["user_id"] - - # Get user details - get_response = await test_admin.get(f"/api/v1/admin/users/{user_id}") - assert get_response.status_code == 200 - - # Get user overview - overview_response = await test_admin.get(f"/api/v1/admin/users/{user_id}/overview") - assert overview_response.status_code == 200 - - overview_data = overview_response.json() - overview = AdminUserOverview(**overview_data) - assert overview.user.user_id == user_id - assert overview.user.username == new_user_data["username"] - - # Update user - update_data = { - "username": f"updated_{unique_id}", - "email": f"updated_{unique_id}@example.com" - } - - update_response = await test_admin.put( - f"/api/v1/admin/users/{user_id}", json=update_data - ) - assert update_response.status_code == 200 - - updated_user = update_response.json() - assert updated_user["username"] == update_data["username"] - assert updated_user["email"] == update_data["email"] - - # Delete user - delete_response = await test_admin.delete(f"/api/v1/admin/users/{user_id}") - assert delete_response.status_code in [200, 204] - - # Verify deletion - get_deleted_response = await test_admin.get(f"/api/v1/admin/users/{user_id}") - assert get_deleted_response.status_code == 404 - - -@pytest.mark.e2e -class TestAdminEvents: - """Test admin event management endpoints against real backend.""" - - @pytest.mark.asyncio - async def test_browse_events(self, test_admin: AsyncClient) -> None: - """Test browsing events with filters.""" - # Browse events - browse_payload = { - "filters": { - "event_types": [EventType.USER_REGISTERED, EventType.USER_LOGGED_IN] - }, - "skip": 0, - "limit": 20, - "sort_by": "timestamp", - "sort_order": -1 - } - - response = await test_admin.post("/api/v1/admin/events/browse", json=browse_payload) - assert response.status_code == 200 - - data = response.json() - assert "events" in data - assert "total" in data - # has_more is optional or not returned by this endpoint - - # Events should exist from our test user registrations - assert isinstance(data["events"], list) - assert data["total"] >= 0 - - @pytest.mark.asyncio - async def test_event_statistics(self, test_admin: AsyncClient) -> None: - """Test getting event statistics.""" - # Get event statistics - response = await test_admin.get("/api/v1/admin/events/stats?hours=24") - assert response.status_code == 200 - - data = response.json() - # Note: Real API might return different fields than EventStatistics model expects - # Just validate the essential fields - assert "total_events" in data - assert data["total_events"] >= 0 - - # Verify structure of what's actually returned - if "events_by_type" in data: - assert isinstance(data["events_by_type"], list) - if "events_by_hour" in data: - assert isinstance(data["events_by_hour"], list) - if "top_users" in data: - assert isinstance(data["top_users"], list) - if "error_rate" in data: - # Implementation may return percentage points or ratio; just ensure non-negative float - assert isinstance(data["error_rate"], (int, float)) - assert data["error_rate"] >= 0.0 - - @pytest.mark.asyncio - async def test_admin_events_export_csv_and_json(self, test_admin: AsyncClient) -> None: - """Export admin events as CSV and JSON and validate basic structure.""" - # CSV export - r_csv = await test_admin.get("/api/v1/admin/events/export/csv?limit=10") - assert r_csv.status_code == 200, f"CSV export failed: {r_csv.status_code} - {r_csv.text[:200]}" - ct_csv = r_csv.headers.get("content-type", "") - assert "text/csv" in ct_csv - body_csv = r_csv.text - # Header line should be present even if empty dataset - assert "Event ID" in body_csv and "Timestamp" in body_csv - - # JSON export - r_json = await test_admin.get("/api/v1/admin/events/export/json?limit=10") - assert r_json.status_code == 200, f"JSON export failed: {r_json.status_code} - {r_json.text[:200]}" - ct_json = r_json.headers.get("content-type", "") - assert "application/json" in ct_json - data = r_json.json() - assert "export_metadata" in data and "events" in data - assert isinstance(data["events"], list) - assert "exported_at" in data["export_metadata"] - - @pytest.mark.asyncio - async def test_admin_user_rate_limits_and_password_reset(self, test_admin: AsyncClient) -> None: - """Create a user, manage rate limits, and reset password via admin endpoints.""" - # Create a new user to operate on - unique_id = str(uuid4())[:8] - new_user = { - "username": f"rate_limit_user_{unique_id}", - "email": f"rl_{unique_id}@example.com", - "password": "TempP@ss1234" - } - create_response = await test_admin.post("/api/v1/admin/users/", json=new_user) - assert create_response.status_code in [200, 201] - target_user_id = create_response.json()["user_id"] - - # Get current rate limits (may be None for fresh user) - rl_get = await test_admin.get(f"/api/v1/admin/users/{target_user_id}/rate-limits") - assert rl_get.status_code == 200 - rl_body = rl_get.json() - assert rl_body.get("user_id") == target_user_id - assert "current_usage" in rl_body - - # Update rate limits for user - update_payload = { - "user_id": target_user_id, - "bypass_rate_limit": False, - "global_multiplier": 1.0, - "rules": [ - { - "endpoint_pattern": r"^/api/v1/execute", - "group": "execution", - "requests": 5, - "window_seconds": 60, - "burst_multiplier": 1.0, - "algorithm": "sliding_window", - "priority": 10, - "enabled": True - } - ] - } - rl_put = await test_admin.put( - f"/api/v1/admin/users/{target_user_id}/rate-limits", - json=update_payload - ) - assert rl_put.status_code == 200 - put_body = rl_put.json() - assert put_body.get("updated") is True - assert put_body.get("config", {}).get("user_id") == target_user_id - - # Reset rate limits - rl_reset = await test_admin.post( - f"/api/v1/admin/users/{target_user_id}/rate-limits/reset" - ) - assert rl_reset.status_code == 200 - - # Reset password for the user - new_password = "NewPassw0rd!" - pw_reset = await test_admin.post( - f"/api/v1/admin/users/{target_user_id}/reset-password", - json={"new_password": new_password} - ) - assert pw_reset.status_code == 200 - - # Verify user can login with the new password - logout_resp = await test_admin.post("/api/v1/auth/logout") - assert logout_resp.status_code in [200, 204] - login_new = await test_admin.post( - "/api/v1/auth/login", - data={"username": new_user["username"], "password": new_password} - ) - assert login_new.status_code == 200 diff --git a/backend/tests/e2e/test_admin_settings_routes.py b/backend/tests/e2e/test_admin_settings_routes.py index 7a3dffd3..1cb71e4a 100644 --- a/backend/tests/e2e/test_admin_settings_routes.py +++ b/backend/tests/e2e/test_admin_settings_routes.py @@ -24,31 +24,22 @@ async def test_get_system_settings(self, test_admin: AsyncClient) -> None: settings = SystemSettings.model_validate(response.json()) # Validate execution limits - assert settings.execution_limits is not None - assert isinstance(settings.execution_limits, ExecutionLimitsSchema) assert settings.execution_limits.max_timeout_seconds >= 10 assert settings.execution_limits.max_memory_mb >= 128 assert settings.execution_limits.max_cpu_cores >= 1 assert settings.execution_limits.max_concurrent_executions >= 1 # Validate security settings - assert settings.security_settings is not None - assert isinstance(settings.security_settings, SecuritySettingsSchema) assert settings.security_settings.password_min_length >= 6 assert settings.security_settings.session_timeout_minutes >= 5 assert settings.security_settings.max_login_attempts >= 3 assert settings.security_settings.lockout_duration_minutes >= 5 # Validate monitoring settings - assert settings.monitoring_settings is not None - assert isinstance( - settings.monitoring_settings, MonitoringSettingsSchema - ) assert settings.monitoring_settings.metrics_retention_days >= 7 assert settings.monitoring_settings.log_level in [ "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" ] - assert isinstance(settings.monitoring_settings.enable_tracing, bool) assert 0.0 <= settings.monitoring_settings.sampling_rate <= 1.0 @pytest.mark.asyncio diff --git a/backend/tests/e2e/test_admin_users_routes.py b/backend/tests/e2e/test_admin_users_routes.py index d82f50a6..db4edead 100644 --- a/backend/tests/e2e/test_admin_users_routes.py +++ b/backend/tests/e2e/test_admin_users_routes.py @@ -2,11 +2,7 @@ import pytest from app.domain.enums.user import UserRole -from app.schemas_pydantic.admin_user_overview import ( - AdminUserOverview, - DerivedCounts, - RateLimitSummary, -) +from app.schemas_pydantic.admin_user_overview import AdminUserOverview from app.schemas_pydantic.user import ( DeleteUserResponse, MessageResponse, @@ -46,10 +42,9 @@ async def test_list_users(self, test_admin: AsyncClient) -> None: assert response.status_code == 200 result = UserListResponse.model_validate(response.json()) - assert result.total >= 0 + assert result.total >= 1 # at least admin + test users assert result.offset == 0 assert result.limit == 100 # default - assert isinstance(result.users, list) @pytest.mark.asyncio async def test_list_users_with_pagination( @@ -77,8 +72,7 @@ async def test_list_users_with_search( ) assert response.status_code == 200 - result = UserListResponse.model_validate(response.json()) - assert isinstance(result.users, list) + UserListResponse.model_validate(response.json()) @pytest.mark.asyncio async def test_list_users_with_role_filter( @@ -128,13 +122,11 @@ async def test_create_user(self, test_admin: AsyncClient) -> None: raw_data = response.json() user = UserResponse.model_validate(raw_data) - assert user.user_id is not None + assert user.user_id assert user.username == request.username assert user.email == request.email assert user.role == UserRole.USER assert user.is_active is True - assert user.created_at is not None - assert user.updated_at is not None # Security: password must not be exposed in response assert "password" not in raw_data @@ -294,19 +286,6 @@ async def test_get_user_overview(self, test_admin: AsyncClient) -> None: overview = AdminUserOverview.model_validate(response.json()) assert overview.user.user_id == user_id - assert overview.stats is not None - assert overview.stats.total_events >= 0 - - # Validate derived counts - assert isinstance(overview.derived_counts, DerivedCounts) - assert overview.derived_counts.succeeded >= 0 - assert overview.derived_counts.failed >= 0 - assert overview.derived_counts.timeout >= 0 - assert overview.derived_counts.cancelled >= 0 - - # Validate rate limit summary - assert isinstance(overview.rate_limit_summary, RateLimitSummary) - assert isinstance(overview.recent_events, list) @pytest.mark.asyncio async def test_get_user_overview_not_found( diff --git a/backend/tests/e2e/test_alertmanager.py b/backend/tests/e2e/test_alertmanager.py deleted file mode 100644 index 62068d43..00000000 --- a/backend/tests/e2e/test_alertmanager.py +++ /dev/null @@ -1,41 +0,0 @@ -from datetime import datetime, timezone - -import httpx -import pytest - -pytestmark = pytest.mark.e2e - - -@pytest.mark.asyncio -async def test_grafana_alert_endpoints(client: httpx.AsyncClient) -> None: - # Test endpoint - r_test = await client.get("/api/v1/alerts/grafana/test") - assert r_test.status_code == 200 - assert "webhook_url" in r_test.json() - - # Webhook minimal payload - payload = { - "receiver": "it", - "status": "firing", - "alerts": [ - { - "status": "firing", - "labels": {"alertname": "IT Test", "severity": "warning"}, - "annotations": {"summary": "Sum", "description": "Desc"}, - "startsAt": datetime.now(timezone.utc).isoformat(), - "endsAt": datetime.now(timezone.utc).isoformat(), - "generatorURL": "http://example", - "fingerprint": "abc123", - } - ], - "groupLabels": {}, - "commonLabels": {}, - "commonAnnotations": {}, - "externalURL": "http://am", - "version": "4", - "groupKey": "{}:{}", - } - r_webhook = await client.post("/api/v1/alerts/grafana", json=payload) - assert r_webhook.status_code == 200 - body = r_webhook.json() - assert body.get("alerts_received") == 1 diff --git a/backend/tests/e2e/test_auth_routes.py b/backend/tests/e2e/test_auth_routes.py index 478329fd..aa6f825e 100644 --- a/backend/tests/e2e/test_auth_routes.py +++ b/backend/tests/e2e/test_auth_routes.py @@ -95,8 +95,6 @@ async def test_register_success( assert result.email == new_user_request.email assert result.role == UserRole.USER assert result.is_superuser is False - assert result.created_at is not None - assert result.updated_at is not None @pytest.mark.asyncio async def test_register_duplicate_username( @@ -169,9 +167,9 @@ async def test_get_profile_authenticated(self, test_user: AsyncClient) -> None: assert response.status_code == 200 result = UserResponse.model_validate(response.json()) - assert result.user_id is not None - assert result.username is not None - assert result.email is not None + assert result.user_id + assert result.username + assert result.email assert result.role in [UserRole.USER, UserRole.ADMIN] assert response.headers.get("Cache-Control") == "no-store" @@ -195,9 +193,9 @@ async def test_verify_valid_token(self, test_user: AsyncClient) -> None: result = TokenValidationResponse.model_validate(response.json()) assert result.valid is True - assert result.username is not None + assert result.username assert result.role in [UserRole.USER, UserRole.ADMIN] - assert result.csrf_token is not None + assert result.csrf_token @pytest.mark.asyncio async def test_verify_invalid_token(self, client: AsyncClient) -> None: diff --git a/backend/tests/e2e/test_dlq_routes.py b/backend/tests/e2e/test_dlq_routes.py index c48857d5..42f8aa89 100644 --- a/backend/tests/e2e/test_dlq_routes.py +++ b/backend/tests/e2e/test_dlq_routes.py @@ -25,9 +25,6 @@ async def test_get_dlq_stats(self, test_user: AsyncClient) -> None: assert response.status_code == 200 stats = DLQStats.model_validate(response.json()) - assert isinstance(stats.by_status, dict) - assert isinstance(stats.by_topic, list) - assert isinstance(stats.by_event_type, list) assert stats.age_stats is not None assert stats.timestamp is not None @@ -51,10 +48,8 @@ async def test_get_dlq_messages(self, test_user: AsyncClient) -> None: assert response.status_code == 200 result = DLQMessagesResponse.model_validate(response.json()) - assert result.total >= 0 assert result.offset == 0 assert result.limit == 50 # default - assert isinstance(result.messages, list) @pytest.mark.asyncio async def test_get_dlq_messages_with_pagination( @@ -98,8 +93,7 @@ async def test_get_dlq_messages_by_topic( ) assert response.status_code == 200 - result = DLQMessagesResponse.model_validate(response.json()) - assert isinstance(result.messages, list) + DLQMessagesResponse.model_validate(response.json()) @pytest.mark.asyncio async def test_get_dlq_messages_by_event_type( @@ -112,8 +106,7 @@ async def test_get_dlq_messages_by_event_type( ) assert response.status_code == 200 - result = DLQMessagesResponse.model_validate(response.json()) - assert isinstance(result.messages, list) + DLQMessagesResponse.model_validate(response.json()) class TestGetDLQMessage: @@ -142,21 +135,20 @@ async def test_get_dlq_message_detail( assert list_response.status_code == 200 result = DLQMessagesResponse.model_validate(list_response.json()) - if result.messages: - event_id = result.messages[0].event.event_id + if not result.messages: + pytest.skip("No DLQ messages available to test detail endpoint") + + event_id = result.messages[0].event.event_id - # Get detail - response = await test_user.get( - f"/api/v1/dlq/messages/{event_id}" - ) - assert response.status_code == 200 - detail = DLQMessageDetail.model_validate(response.json()) - assert detail.event is not None - assert detail.original_topic is not None - assert detail.error is not None - assert detail.retry_count >= 0 - assert detail.failed_at is not None - assert detail.status is not None + response = await test_user.get( + f"/api/v1/dlq/messages/{event_id}" + ) + assert response.status_code == 200 + detail = DLQMessageDetail.model_validate(response.json()) + assert detail.event is not None + assert detail.original_topic + assert detail.error + assert detail.retry_count >= 0 class TestRetryDLQMessages: @@ -173,23 +165,22 @@ async def test_retry_dlq_messages(self, test_user: AsyncClient) -> None: assert list_response.status_code == 200 result = DLQMessagesResponse.model_validate(list_response.json()) - if result.messages: - event_ids = [msg.event.event_id for msg in result.messages[:2]] + if not result.messages: + pytest.skip("No DLQ messages available to test retry") - # Retry - response = await test_user.post( - "/api/v1/dlq/retry", - json={"event_ids": event_ids}, - ) - assert response.status_code == 200 - retry_result = DLQBatchRetryResponse.model_validate( - response.json() - ) + event_ids = [msg.event.event_id for msg in result.messages[:2]] + + response = await test_user.post( + "/api/v1/dlq/retry", + json={"event_ids": event_ids}, + ) + assert response.status_code == 200 + retry_result = DLQBatchRetryResponse.model_validate( + response.json() + ) - assert retry_result.total >= 0 - assert retry_result.successful >= 0 - assert retry_result.failed >= 0 - assert isinstance(retry_result.details, list) + assert retry_result.total == len(event_ids) + assert retry_result.successful + retry_result.failed == retry_result.total @pytest.mark.asyncio async def test_retry_dlq_messages_empty_list( @@ -218,20 +209,32 @@ async def test_retry_dlq_messages_nonexistent( # May succeed with failures reported in details assert response.status_code == 200 result = DLQBatchRetryResponse.model_validate(response.json()) - assert isinstance(result.details, list) + assert result.total == 2 + assert result.failed == 2 class TestSetRetryPolicy: """Tests for POST /api/v1/dlq/retry-policy.""" @pytest.mark.asyncio - async def test_set_retry_policy(self, test_user: AsyncClient) -> None: - """Set retry policy for a topic.""" + @pytest.mark.parametrize( + ("strategy", "topic"), + [ + (RetryStrategy.EXPONENTIAL_BACKOFF, "execution-events"), + (RetryStrategy.FIXED_INTERVAL, "test-topic"), + (RetryStrategy.SCHEDULED, "notifications-topic"), + ], + ids=lambda v: v if isinstance(v, str) else v.value, + ) + async def test_set_retry_policy( + self, test_user: AsyncClient, strategy: RetryStrategy, topic: str + ) -> None: + """Set retry policy for each strategy type.""" response = await test_user.post( "/api/v1/dlq/retry-policy", json={ - "topic": "execution-events", - "strategy": RetryStrategy.EXPONENTIAL_BACKOFF, + "topic": topic, + "strategy": strategy, "max_retries": 5, "base_delay_seconds": 60.0, "max_delay_seconds": 3600.0, @@ -241,49 +244,7 @@ async def test_set_retry_policy(self, test_user: AsyncClient) -> None: assert response.status_code == 200 result = MessageResponse.model_validate(response.json()) - assert "execution-events" in result.message - - @pytest.mark.asyncio - async def test_set_retry_policy_fixed_strategy( - self, test_user: AsyncClient - ) -> None: - """Set retry policy with fixed strategy.""" - response = await test_user.post( - "/api/v1/dlq/retry-policy", - json={ - "topic": "test-topic", - "strategy": RetryStrategy.FIXED_INTERVAL, - "max_retries": 3, - "base_delay_seconds": 30.0, - "max_delay_seconds": 300.0, - "retry_multiplier": 1.0, - }, - ) - - assert response.status_code == 200 - result = MessageResponse.model_validate(response.json()) - assert "test-topic" in result.message - - @pytest.mark.asyncio - async def test_set_retry_policy_scheduled_strategy( - self, test_user: AsyncClient - ) -> None: - """Set retry policy with scheduled strategy.""" - response = await test_user.post( - "/api/v1/dlq/retry-policy", - json={ - "topic": "notifications-topic", - "strategy": RetryStrategy.SCHEDULED, - "max_retries": 10, - "base_delay_seconds": 120.0, - "max_delay_seconds": 7200.0, - "retry_multiplier": 1.5, - }, - ) - - assert response.status_code == 200 - result = MessageResponse.model_validate(response.json()) - assert "notifications-topic" in result.message + assert topic in result.message class TestDiscardDLQMessage: @@ -311,20 +272,21 @@ async def test_discard_dlq_message(self, test_user: AsyncClient) -> None: assert list_response.status_code == 200 result = DLQMessagesResponse.model_validate(list_response.json()) - if result.messages: - event_id = result.messages[0].event.event_id - - # Discard - response = await test_user.delete( - f"/api/v1/dlq/messages/{event_id}", - params={"reason": "Test discard for E2E testing"}, - ) - assert response.status_code == 200 - msg_result = MessageResponse.model_validate( - response.json() - ) - assert event_id in msg_result.message - assert "discarded" in msg_result.message.lower() + if not result.messages: + pytest.skip("No DLQ messages available to test discard") + + event_id = result.messages[0].event.event_id + + response = await test_user.delete( + f"/api/v1/dlq/messages/{event_id}", + params={"reason": "Test discard for E2E testing"}, + ) + assert response.status_code == 200 + msg_result = MessageResponse.model_validate( + response.json() + ) + assert event_id in msg_result.message + assert "discarded" in msg_result.message.lower() @pytest.mark.asyncio async def test_discard_dlq_message_requires_reason( @@ -352,9 +314,8 @@ async def test_get_dlq_topics(self, test_user: AsyncClient) -> None: ] for topic in topics: - assert topic.topic is not None + assert topic.topic assert topic.total_messages >= 0 - assert isinstance(topic.status_breakdown, dict) assert topic.avg_retry_count >= 0 assert topic.max_retry_count >= 0 diff --git a/backend/tests/e2e/test_events_routes.py b/backend/tests/e2e/test_events_routes.py index b73fa895..e851a705 100644 --- a/backend/tests/e2e/test_events_routes.py +++ b/backend/tests/e2e/test_events_routes.py @@ -1,5 +1,3 @@ -import asyncio - import pytest from app.domain.enums.events import EventType from app.domain.events.typed import DomainEvent @@ -19,78 +17,9 @@ pytestmark = [pytest.mark.e2e, pytest.mark.kafka] - -async def wait_for_user_events( - client: AsyncClient, - timeout: float = 30.0, - poll_interval: float = 0.5, -) -> EventListResponse: - """Poll until at least one event exists for the user. - - Args: - client: Authenticated HTTP client - timeout: Maximum time to wait in seconds - poll_interval: Time between polls in seconds - - Returns: - EventListResponse with at least one event - - Raises: - TimeoutError: If no events appear within timeout - AssertionError: If API returns unexpected status code - """ - deadline = asyncio.get_event_loop().time() + timeout - - while asyncio.get_event_loop().time() < deadline: - response = await client.get("/api/v1/events/user", params={"limit": 10}) - assert response.status_code == 200, f"Unexpected: {response.status_code} - {response.text}" - - result = EventListResponse.model_validate(response.json()) - if result.events: - return result - - await asyncio.sleep(poll_interval) - - raise TimeoutError(f"No events appeared for user within {timeout}s") - - -async def wait_for_aggregate_events( - client: AsyncClient, - aggregate_id: str, - timeout: float = 30.0, - poll_interval: float = 0.5, -) -> EventListResponse: - """Poll until at least one event exists for the aggregate. - - Args: - client: Authenticated HTTP client - aggregate_id: Aggregate ID (execution_id) to check - timeout: Maximum time to wait in seconds - poll_interval: Time between polls in seconds - - Returns: - EventListResponse with at least one event - - Raises: - TimeoutError: If no events appear within timeout - AssertionError: If API returns unexpected status code - """ - deadline = asyncio.get_event_loop().time() + timeout - - while asyncio.get_event_loop().time() < deadline: - response = await client.get( - f"/api/v1/events/executions/{aggregate_id}/events", - params={"limit": 10}, - ) - assert response.status_code == 200, f"Unexpected: {response.status_code} - {response.text}" - - result = EventListResponse.model_validate(response.json()) - if result.events: - return result - - await asyncio.sleep(poll_interval) - - raise TimeoutError(f"No events appeared for aggregate {aggregate_id} within {timeout}s") +# Events are stored in MongoDB by the producer BEFORE publishing to Kafka. +# By the time the created_execution fixture returns, EXECUTION_REQUESTED is +# already in the event store. No waiting needed for any of these tests. class TestExecutionEvents: @@ -101,21 +30,26 @@ async def test_get_execution_events( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Get events for a specific execution.""" - result = await wait_for_aggregate_events(test_user, created_execution.execution_id) + response = await test_user.get( + f"/api/v1/events/executions/{created_execution.execution_id}/events", + params={"limit": 10}, + ) - assert result.total >= 1 + assert response.status_code == 200 + result = EventListResponse.model_validate(response.json()) + assert result.total == 1 assert result.limit == 10 assert result.skip == 0 assert isinstance(result.has_more, bool) - assert len(result.events) >= 1 + assert len(result.events) == 1 + assert result.events[0].event_type == EventType.EXECUTION_REQUESTED + assert result.events[0].execution_id == created_execution.execution_id @pytest.mark.asyncio async def test_get_execution_events_pagination( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Pagination works for execution events.""" - await wait_for_aggregate_events(test_user, created_execution.execution_id) - response = await test_user.get( f"/api/v1/events/executions/{created_execution.execution_id}/events", params={"limit": 5, "skip": 0}, @@ -125,6 +59,8 @@ async def test_get_execution_events_pagination( result = EventListResponse.model_validate(response.json()) assert result.limit == 5 assert result.skip == 0 + assert len(result.events) <= 5 + assert result.total >= 1 @pytest.mark.asyncio async def test_get_execution_events_access_denied( @@ -147,18 +83,19 @@ async def test_get_user_events( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Get events for current user.""" - result = await wait_for_user_events(test_user) + response = await test_user.get("/api/v1/events/user", params={"limit": 10}) + assert response.status_code == 200 + result = EventListResponse.model_validate(response.json()) assert result.total >= 1 assert len(result.events) >= 1 + assert len(result.events) == min(result.total, 10) @pytest.mark.asyncio async def test_get_user_events_with_filters( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Filter user events by event types.""" - await wait_for_user_events(test_user) - response = await test_user.get( "/api/v1/events/user", params={ @@ -170,6 +107,8 @@ async def test_get_user_events_with_filters( assert response.status_code == 200 result = EventListResponse.model_validate(response.json()) assert result.limit == 10 + for e in result.events: + assert e.event_type == EventType.EXECUTION_REQUESTED @pytest.mark.asyncio async def test_get_user_events_unauthenticated( @@ -188,8 +127,6 @@ async def test_query_events( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Query events with filters.""" - await wait_for_user_events(test_user) - response = await test_user.post( "/api/v1/events/query", json={ @@ -202,6 +139,9 @@ async def test_query_events( assert response.status_code == 200 result = EventListResponse.model_validate(response.json()) assert result.limit == 50 + assert len(result.events) >= 1 + for e in result.events: + assert e.event_type == EventType.EXECUTION_REQUESTED @pytest.mark.asyncio async def test_query_events_with_correlation_id( @@ -263,24 +203,21 @@ async def test_get_event_statistics( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Get event statistics for current user.""" - await wait_for_user_events(test_user) - response = await test_user.get("/api/v1/events/statistics") assert response.status_code == 200 stats = EventStatistics.model_validate(response.json()) assert stats.total_events >= 1 - assert stats.events_by_type is not None - assert stats.events_by_service is not None + assert len(stats.events_by_type) >= 1 + assert len(stats.events_by_service) >= 1 + assert any(e.event_type == EventType.EXECUTION_REQUESTED for e in stats.events_by_type) @pytest.mark.asyncio async def test_get_event_statistics_with_time_range( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Get event statistics with time range.""" - await wait_for_user_events(test_user) - response = await test_user.get( "/api/v1/events/statistics", params={ @@ -292,6 +229,7 @@ async def test_get_event_statistics_with_time_range( assert response.status_code == 200 stats = EventStatistics.model_validate(response.json()) assert stats.total_events >= 1 + assert len(stats.events_by_type) >= 1 class TestSingleEvent: @@ -309,7 +247,10 @@ async def test_get_event_by_id( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Get single event by ID.""" - events_result = await wait_for_user_events(test_user) + events_resp = await test_user.get("/api/v1/events/user", params={"limit": 10}) + assert events_resp.status_code == 200 + events_result = EventListResponse.model_validate(events_resp.json()) + assert events_result.events event_id = events_result.events[0].event_id response = await test_user.get(f"/api/v1/events/{event_id}") @@ -374,8 +315,6 @@ async def test_aggregate_events( self, test_user: AsyncClient, created_execution: ExecutionResponse ) -> None: """Aggregate events with MongoDB pipeline.""" - await wait_for_user_events(test_user) - response = await test_user.post( "/api/v1/events/aggregate", json={ @@ -390,6 +329,8 @@ async def test_aggregate_events( result = response.json() assert isinstance(result, list) assert len(result) >= 1 + for item in result: + assert "_id" in item and "count" in item and item["count"] > 0 class TestListEventTypes: @@ -417,6 +358,7 @@ async def test_list_event_types(self, test_admin: AsyncClient) -> None: result = response.json() assert isinstance(result, list) assert len(result) > 0 + assert EventType.SCRIPT_SAVED in result class TestDeleteEvent: @@ -452,6 +394,10 @@ async def test_delete_event_admin_only( assert result.event_id == event_id assert "deleted" in result.message.lower() + # Verify event is actually gone + get_resp = await test_admin.get(f"/api/v1/events/{event_id}") + assert get_resp.status_code == 404 + @pytest.mark.asyncio async def test_delete_event_forbidden_for_user( self, test_user: AsyncClient @@ -470,8 +416,6 @@ async def test_replay_events_dry_run( self, test_admin: AsyncClient, created_execution_admin: ExecutionResponse ) -> None: """Replay events in dry run mode.""" - await wait_for_aggregate_events(test_admin, created_execution_admin.execution_id) - response = await test_admin.post( f"/api/v1/events/replay/{created_execution_admin.execution_id}", params={"dry_run": True}, @@ -481,6 +425,8 @@ async def test_replay_events_dry_run( result = ReplayAggregateResponse.model_validate(response.json()) assert result.dry_run is True assert result.aggregate_id == created_execution_admin.execution_id + assert result.event_count is not None and result.event_count >= 1 + assert result.event_types is not None and len(result.event_types) >= 1 @pytest.mark.asyncio async def test_replay_events_not_found( @@ -518,8 +464,6 @@ async def test_user_cannot_access_other_users_events( created_execution: ExecutionResponse, ) -> None: """User cannot access another user's execution events.""" - await wait_for_aggregate_events(test_user, created_execution.execution_id) - response = await another_user.get( f"/api/v1/events/executions/{created_execution.execution_id}/events" ) @@ -534,13 +478,13 @@ async def test_user_events_only_shows_own_events( created_execution: ExecutionResponse, ) -> None: """User events endpoint only returns user's own events.""" - events_result = await wait_for_user_events(test_user) - user_event_ids = {e.event_id for e in events_result.events} + events_resp = await test_user.get("/api/v1/events/user") + assert events_resp.status_code == 200 + user_event_ids = {e.event_id for e in EventListResponse.model_validate(events_resp.json()).events} another_response = await another_user.get("/api/v1/events/user") assert another_response.status_code == 200 + another_event_ids = {e.event_id for e in EventListResponse.model_validate(another_response.json()).events} - another_result = EventListResponse.model_validate(another_response.json()) - another_event_ids = {e.event_id for e in another_result.events} - + assert len(user_event_ids) >= 1, "test_user should have events from created_execution" assert user_event_ids.isdisjoint(another_event_ids) diff --git a/backend/tests/e2e/test_execution_routes.py b/backend/tests/e2e/test_execution_routes.py index f40630d6..e2d38e68 100644 --- a/backend/tests/e2e/test_execution_routes.py +++ b/backend/tests/e2e/test_execution_routes.py @@ -26,6 +26,8 @@ from httpx import AsyncClient from pydantic import TypeAdapter +from tests.e2e.conftest import EventWaiter + pytestmark = [pytest.mark.e2e, pytest.mark.k8s] # TypeAdapter for parsing list of execution events from API response @@ -48,46 +50,21 @@ } -async def wait_for_terminal_state( +async def submit_and_wait( client: AsyncClient, - execution_id: str, - timeout: float = 90.0, - poll_interval: float = 1.0, -) -> ExecutionResult: - """Poll execution result until it reaches a terminal state. - - Args: - client: Authenticated HTTP client - execution_id: ID of execution to wait for - timeout: Maximum time to wait in seconds - poll_interval: Time between polls in seconds - - Returns: - ExecutionResult with terminal status - - Raises: - TimeoutError: If execution doesn't reach terminal state within timeout - AssertionError: If API returns unexpected status code - """ - deadline = asyncio.get_event_loop().time() + timeout - - while asyncio.get_event_loop().time() < deadline: - response = await client.get(f"/api/v1/executions/{execution_id}/result") - - if response.status_code == 404: - # Result not ready yet, keep polling - await asyncio.sleep(poll_interval) - continue - - assert response.status_code == 200, f"Unexpected status {response.status_code}: {response.text}" - - result = ExecutionResult.model_validate(response.json()) - if result.status in TERMINAL_STATES: - return result - - await asyncio.sleep(poll_interval) - - raise TimeoutError(f"Execution {execution_id} did not reach terminal state within {timeout}s") + waiter: EventWaiter, + request: ExecutionRequest, + *, + timeout: float = 30.0, +) -> tuple[ExecutionResponse, ExecutionResult]: + """Submit script and wait for result via Kafka event — no polling.""" + resp = await client.post("/api/v1/execute", json=request.model_dump()) + assert resp.status_code == 200 + execution = ExecutionResponse.model_validate(resp.json()) + await waiter.wait_for_result(execution.execution_id, timeout=timeout) + result_resp = await client.get(f"/api/v1/executions/{execution.execution_id}/result") + assert result_resp.status_code == 200 + return execution, ExecutionResult.model_validate(result_resp.json()) class TestExecutionAuthentication: @@ -107,29 +84,22 @@ class TestExecutionHappyPath: @pytest.mark.asyncio async def test_execute_simple_script_completes( - self, test_user: AsyncClient, simple_execution_request: ExecutionRequest + self, test_user: AsyncClient, event_waiter: EventWaiter, simple_execution_request: ExecutionRequest ) -> None: """Simple script executes and completes successfully.""" - response = await test_user.post("/api/v1/execute", json=simple_execution_request.model_dump()) - assert response.status_code == 200 + exec_response, result = await submit_and_wait(test_user, event_waiter, simple_execution_request) - exec_response = ExecutionResponse.model_validate(response.json()) assert exec_response.execution_id - assert exec_response.status in [ExecutionStatus.QUEUED, ExecutionStatus.SCHEDULED, ExecutionStatus.RUNNING] - - # Wait for completion - result = await wait_for_terminal_state(test_user, exec_response.execution_id) - assert result.status == ExecutionStatus.COMPLETED assert result.execution_id == exec_response.execution_id assert result.lang == "python" assert result.lang_version == "3.11" assert result.stdout is not None - assert "test" in result.stdout + assert result.stdout.strip() == "test" assert result.exit_code == 0 @pytest.mark.asyncio - async def test_execute_multiline_output(self, test_user: AsyncClient) -> None: + async def test_execute_multiline_output(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Script with multiple print statements produces correct output.""" request = ExecutionRequest( script="print('Line 1')\nprint('Line 2')\nprint('Line 3')", @@ -137,20 +107,14 @@ async def test_execute_multiline_output(self, test_user: AsyncClient) -> None: lang_version="3.11", ) - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - result = await wait_for_terminal_state(test_user, exec_response.execution_id) + _, result = await submit_and_wait(test_user, event_waiter, request) assert result.status == ExecutionStatus.COMPLETED assert result.stdout is not None - assert "Line 1" in result.stdout - assert "Line 2" in result.stdout - assert "Line 3" in result.stdout + assert result.stdout.strip() == "Line 1\nLine 2\nLine 3" @pytest.mark.asyncio - async def test_execute_tracks_resource_usage(self, test_user: AsyncClient) -> None: + async def test_execute_tracks_resource_usage(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Execution tracks resource usage metrics.""" request = ExecutionRequest( script="import time; data = list(range(10000)); time.sleep(0.1); print('done')", @@ -158,19 +122,18 @@ async def test_execute_tracks_resource_usage(self, test_user: AsyncClient) -> No lang_version="3.11", ) - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - result = await wait_for_terminal_state(test_user, exec_response.execution_id) + _, result = await submit_and_wait(test_user, event_waiter, request) assert result.status == ExecutionStatus.COMPLETED assert result.resource_usage is not None assert result.resource_usage.execution_time_wall_seconds >= 0.1 assert result.resource_usage.peak_memory_kb > 0 + assert result.resource_usage.cpu_time_jiffies >= 0 + assert result.resource_usage.clk_tck_hertz > 0 + assert result.exit_code == 0 @pytest.mark.asyncio - async def test_execute_large_output(self, test_user: AsyncClient) -> None: + async def test_execute_large_output(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Script with large output completes successfully.""" request = ExecutionRequest( script="for i in range(500): print(f'Line {i}: ' + 'x' * 50)\nprint('END')", @@ -178,23 +141,21 @@ async def test_execute_large_output(self, test_user: AsyncClient) -> None: lang_version="3.11", ) - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - result = await wait_for_terminal_state(test_user, exec_response.execution_id, timeout=120) + _, result = await submit_and_wait(test_user, event_waiter, request, timeout=120) assert result.status == ExecutionStatus.COMPLETED assert result.stdout is not None assert "END" in result.stdout assert len(result.stdout) > 10000 + assert result.exit_code == 0 + assert "Line 0:" in result.stdout class TestExecutionErrors: """Tests for execution error handling.""" @pytest.mark.asyncio - async def test_execute_syntax_error(self, test_user: AsyncClient) -> None: + async def test_execute_syntax_error(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Script with syntax error fails with proper error info.""" request = ExecutionRequest( script="def broken(\n pass", # Missing closing paren @@ -202,11 +163,7 @@ async def test_execute_syntax_error(self, test_user: AsyncClient) -> None: lang_version="3.11", ) - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - result = await wait_for_terminal_state(test_user, exec_response.execution_id) + _, result = await submit_and_wait(test_user, event_waiter, request) # Script errors result in COMPLETED status with non-zero exit code # FAILED is reserved for infrastructure/timeout failures @@ -216,7 +173,7 @@ async def test_execute_syntax_error(self, test_user: AsyncClient) -> None: assert result.exit_code != 0 @pytest.mark.asyncio - async def test_execute_runtime_error(self, test_user: AsyncClient) -> None: + async def test_execute_runtime_error(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Script with runtime error fails with traceback.""" request = ExecutionRequest( script="print('before')\nraise ValueError('test error')\nprint('after')", @@ -224,18 +181,14 @@ async def test_execute_runtime_error(self, test_user: AsyncClient) -> None: lang_version="3.11", ) - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - result = await wait_for_terminal_state(test_user, exec_response.execution_id) + _, result = await submit_and_wait(test_user, event_waiter, request) # Script errors result in COMPLETED status with non-zero exit code # FAILED is reserved for infrastructure/timeout failures assert result.status == ExecutionStatus.COMPLETED assert result.stdout is not None assert "before" in result.stdout - assert "after" not in (result.stdout or "") + assert "after" not in result.stdout assert result.stderr is not None assert "ValueError" in result.stderr assert "test error" in result.stderr @@ -246,7 +199,7 @@ class TestExecutionCancel: @pytest.mark.asyncio async def test_cancel_running_execution( - self, test_user: AsyncClient, long_running_execution_request: ExecutionRequest + self, test_user: AsyncClient, event_waiter: EventWaiter, long_running_execution_request: ExecutionRequest ) -> None: """Running execution can be cancelled.""" response = await test_user.post("/api/v1/execute", json=long_running_execution_request.model_dump()) @@ -254,8 +207,8 @@ async def test_cancel_running_execution( exec_response = ExecutionResponse.model_validate(response.json()) - # Give it a moment to start - await asyncio.sleep(1) + # Wait for saga to start (pod creation command sent) instead of blind sleep + await event_waiter.wait_for_saga_command(exec_response.execution_id) cancel_req = CancelExecutionRequest(reason="Test cancellation") cancel_response = await test_user.post( @@ -266,20 +219,16 @@ async def test_cancel_running_execution( cancel_result = CancelResponse.model_validate(cancel_response.json()) assert cancel_result.execution_id == exec_response.execution_id - assert cancel_result.status in ["cancellation_requested", "already_cancelled"] + assert cancel_result.status == "cancellation_requested" + assert cancel_result.message @pytest.mark.asyncio - async def test_cancel_completed_execution_fails(self, test_user: AsyncClient) -> None: + async def test_cancel_completed_execution_fails( + self, test_user: AsyncClient, event_waiter: EventWaiter + ) -> None: """Cannot cancel already completed execution.""" request = ExecutionRequest(script="print('quick')", lang="python", lang_version="3.11") - - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - - # Wait for completion - await wait_for_terminal_state(test_user, exec_response.execution_id) + exec_response, _ = await submit_and_wait(test_user, event_waiter, request) cancel_req = CancelExecutionRequest(reason="Too late") cancel_response = await test_user.post( @@ -295,15 +244,12 @@ class TestExecutionRetry: """Tests for execution retry.""" @pytest.mark.asyncio - async def test_retry_completed_execution(self, test_user: AsyncClient) -> None: + async def test_retry_completed_execution( + self, test_user: AsyncClient, event_waiter: EventWaiter + ) -> None: """Completed execution can be retried.""" request = ExecutionRequest(script="print('original')", lang="python", lang_version="3.11") - - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - original = ExecutionResponse.model_validate(response.json()) - await wait_for_terminal_state(test_user, original.execution_id) + original, _ = await submit_and_wait(test_user, event_waiter, request) retry_req = RetryExecutionRequest() retry_response = await test_user.post( @@ -316,10 +262,13 @@ async def test_retry_completed_execution(self, test_user: AsyncClient) -> None: assert retried.execution_id != original.execution_id # Wait for retried execution to complete - result = await wait_for_terminal_state(test_user, retried.execution_id) + await event_waiter.wait_for_result(retried.execution_id) + result_resp = await test_user.get(f"/api/v1/executions/{retried.execution_id}/result") + assert result_resp.status_code == 200 + result = ExecutionResult.model_validate(result_resp.json()) assert result.status == ExecutionStatus.COMPLETED assert result.stdout is not None - assert "original" in result.stdout + assert result.stdout.strip() == "original" @pytest.mark.asyncio async def test_retry_running_execution_fails( @@ -338,23 +287,19 @@ async def test_retry_running_execution_fails( ) assert retry_response.status_code == 400 + assert "detail" in retry_response.json() @pytest.mark.asyncio async def test_retry_other_users_execution_forbidden( - self, test_user: AsyncClient, another_user: AsyncClient + self, test_user: AsyncClient, another_user: AsyncClient, event_waiter: EventWaiter ) -> None: """Cannot retry another user's execution.""" request = ExecutionRequest(script="print('owned')", lang="python", lang_version="3.11") - - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - await wait_for_terminal_state(test_user, exec_response.execution_id) + original, _ = await submit_and_wait(test_user, event_waiter, request) retry_req = RetryExecutionRequest() retry_response = await another_user.post( - f"/api/v1/executions/{exec_response.execution_id}/retry", + f"/api/v1/executions/{original.execution_id}/retry", json=retry_req.model_dump(), ) @@ -365,37 +310,24 @@ class TestExecutionEvents: """Tests for execution events.""" @pytest.mark.asyncio - async def test_get_execution_events(self, test_user: AsyncClient) -> None: + async def test_get_execution_events(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Get events for completed execution.""" request = ExecutionRequest(script="print('events test')", lang="python", lang_version="3.11") - - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - await wait_for_terminal_state(test_user, exec_response.execution_id) + exec_response, _ = await submit_and_wait(test_user, event_waiter, request) events_response = await test_user.get(f"/api/v1/executions/{exec_response.execution_id}/events") assert events_response.status_code == 200 events = ExecutionEventsAdapter.validate_python(events_response.json()) - # Event store is eventually consistent (batch flush). Verify API works and - # at least one execution event is present (COMPLETED is always stored by - # the time we query since wait_for_terminal_state ensures execution finished). - assert len(events) > 0 + assert len(events) >= 2 event_types = {e.event_type for e in events} - assert event_types & {EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED} + assert {EventType.EXECUTION_REQUESTED, EventType.EXECUTION_COMPLETED} <= event_types @pytest.mark.asyncio - async def test_get_events_filtered_by_type(self, test_user: AsyncClient) -> None: + async def test_get_events_filtered_by_type(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Filter events by event type.""" request = ExecutionRequest(script="print('filter test')", lang="python", lang_version="3.11") - - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - await wait_for_terminal_state(test_user, exec_response.execution_id) + exec_response, _ = await submit_and_wait(test_user, event_waiter, request) events_response = await test_user.get( f"/api/v1/executions/{exec_response.execution_id}/events", @@ -426,15 +358,12 @@ class TestExecutionDelete: @pytest.mark.asyncio @pytest.mark.admin - async def test_admin_delete_execution(self, test_user: AsyncClient, test_admin: AsyncClient) -> None: + async def test_admin_delete_execution( + self, test_user: AsyncClient, test_admin: AsyncClient, event_waiter: EventWaiter + ) -> None: """Admin can delete an execution.""" request = ExecutionRequest(script="print('to delete')", lang="python", lang_version="3.11") - - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 - - exec_response = ExecutionResponse.model_validate(response.json()) - await wait_for_terminal_state(test_user, exec_response.execution_id) + exec_response, _ = await submit_and_wait(test_user, event_waiter, request) delete_response = await test_admin.delete(f"/api/v1/executions/{exec_response.execution_id}") assert delete_response.status_code == 200 @@ -472,17 +401,11 @@ class TestExecutionList: """Tests for execution listing.""" @pytest.mark.asyncio - async def test_get_user_executions(self, test_user: AsyncClient) -> None: + async def test_get_user_executions(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """User can list their executions.""" - # Create an execution request = ExecutionRequest(script="print('list test')", lang="python", lang_version="3.11") - response = await test_user.post("/api/v1/execute", json=request.model_dump()) - assert response.status_code == 200 + exec_response, _ = await submit_and_wait(test_user, event_waiter, request) - exec_response = ExecutionResponse.model_validate(response.json()) - await wait_for_terminal_state(test_user, exec_response.execution_id) - - # List executions list_response = await test_user.get("/api/v1/user/executions", params={"limit": 10, "skip": 0}) assert list_response.status_code == 200 @@ -491,6 +414,8 @@ async def test_get_user_executions(self, test_user: AsyncClient) -> None: assert result.skip == 0 assert result.total >= 1 assert len(result.executions) >= 1 + exec_ids = {e.execution_id for e in result.executions} + assert exec_response.execution_id in exec_ids @pytest.mark.asyncio async def test_list_executions_pagination(self, test_user: AsyncClient) -> None: @@ -516,6 +441,11 @@ async def test_list_executions_pagination(self, test_user: AsyncClient) -> None: page2 = ExecutionListResponse.model_validate(page2_response.json()) assert page2.skip == 2 + assert len(page2.executions) >= 1 + + ids1 = {e.execution_id for e in page1.executions} + ids2 = {e.execution_id for e in page2.executions} + assert ids1.isdisjoint(ids2) @pytest.mark.asyncio async def test_list_executions_filter_by_language(self, test_user: AsyncClient) -> None: @@ -528,6 +458,7 @@ async def test_list_executions_filter_by_language(self, test_user: AsyncClient) assert list_response.status_code == 200 result = ExecutionListResponse.model_validate(list_response.json()) + assert len(result.executions) >= 1 for execution in result.executions: assert execution.lang == "python" @@ -558,7 +489,7 @@ class TestExecutionConcurrency: @pytest.mark.asyncio @pytest.mark.xdist_group("execution_concurrency") @pytest.mark.xfail(reason="Flaky: K8s pod scheduling may timeout under resource pressure", strict=False) - async def test_concurrent_executions(self, test_user: AsyncClient) -> None: + async def test_concurrent_executions(self, test_user: AsyncClient, event_waiter: EventWaiter) -> None: """Multiple concurrent executions work correctly.""" tasks = [] for i in range(3): @@ -576,10 +507,15 @@ async def test_concurrent_executions(self, test_user: AsyncClient) -> None: # All IDs should be unique assert len(execution_ids) == 3 - # Wait for all to complete + # Wait for all to complete — parallel futures, not sequential polling + await asyncio.gather(*(event_waiter.wait_for_result(eid) for eid in execution_ids)) for exec_id in execution_ids: - result = await wait_for_terminal_state(test_user, exec_id) + result_resp = await test_user.get(f"/api/v1/executions/{exec_id}/result") + assert result_resp.status_code == 200 + result = ExecutionResult.model_validate(result_resp.json()) assert result.status == ExecutionStatus.COMPLETED + assert result.exit_code == 0 + assert result.stdout is not None class TestPublicEndpoints: @@ -592,8 +528,9 @@ async def test_get_example_scripts(self, client: AsyncClient) -> None: assert response.status_code == 200 result = ExampleScripts.model_validate(response.json()) - assert isinstance(result.scripts, dict) + assert len(result.scripts) >= 1 assert "python" in result.scripts + assert len(result.scripts["python"]) > 0 @pytest.mark.asyncio async def test_get_k8s_resource_limits(self, client: AsyncClient) -> None: @@ -604,5 +541,8 @@ async def test_get_k8s_resource_limits(self, client: AsyncClient) -> None: result = ResourceLimits.model_validate(response.json()) assert result.cpu_limit assert result.memory_limit + assert result.cpu_request + assert result.memory_request assert result.execution_timeout > 0 assert "python" in result.supported_runtimes + assert len(result.supported_runtimes["python"].versions) >= 1 diff --git a/backend/tests/e2e/test_grafana_alerts_routes.py b/backend/tests/e2e/test_grafana_alerts_routes.py index 17d95ac4..86adbfb2 100644 --- a/backend/tests/e2e/test_grafana_alerts_routes.py +++ b/backend/tests/e2e/test_grafana_alerts_routes.py @@ -46,10 +46,10 @@ async def test_receive_grafana_alert(self, client: AsyncClient) -> None: assert response.status_code == 200 result = AlertResponse.model_validate(response.json()) - assert result.message is not None + assert result.message assert result.alerts_received == 1 - assert result.alerts_processed >= 0 - assert isinstance(result.errors, list) + assert result.alerts_processed == 1 + assert len(result.errors) == 0 @pytest.mark.asyncio async def test_receive_multiple_grafana_alerts( @@ -88,7 +88,7 @@ async def test_receive_multiple_grafana_alerts( result = AlertResponse.model_validate(response.json()) assert result.alerts_received == 3 - assert result.alerts_processed >= 0 + assert result.alerts_processed == 3 @pytest.mark.asyncio async def test_receive_grafana_alert_resolved( diff --git a/backend/tests/e2e/test_health_routes.py b/backend/tests/e2e/test_health_routes.py index be2f5de4..f90c6166 100644 --- a/backend/tests/e2e/test_health_routes.py +++ b/backend/tests/e2e/test_health_routes.py @@ -21,7 +21,6 @@ async def test_liveness_probe(self, client: AsyncClient) -> None: assert result.status == "ok" assert result.uptime_seconds >= 0 - assert result.timestamp is not None @pytest.mark.asyncio async def test_readiness_probe(self, client: AsyncClient) -> None: @@ -65,7 +64,7 @@ async def test_liveness_is_fast(self, client: AsyncClient) -> None: start = time.time() r = await client.get("/api/v1/health/live") assert r.status_code == 200 - assert time.time() - start < 1.0 + assert time.time() - start < 5.0 @pytest.mark.asyncio async def test_concurrent_liveness_fetch(self, client: AsyncClient) -> None: diff --git a/backend/tests/e2e/test_notifications_routes.py b/backend/tests/e2e/test_notifications_routes.py index 11c8268e..39c41f97 100644 --- a/backend/tests/e2e/test_notifications_routes.py +++ b/backend/tests/e2e/test_notifications_routes.py @@ -1,5 +1,3 @@ -import asyncio - import pytest from app.domain.enums.notification import NotificationChannel, NotificationSeverity, NotificationStatus from app.schemas_pydantic.execution import ExecutionResponse @@ -16,40 +14,6 @@ pytestmark = [pytest.mark.e2e, pytest.mark.kafka] -async def wait_for_notification( - client: AsyncClient, - timeout: float = 30.0, - poll_interval: float = 0.5, -) -> NotificationResponse: - """Poll until at least one notification exists for the user. - - Args: - client: Authenticated HTTP client - timeout: Maximum time to wait in seconds - poll_interval: Time between polls in seconds - - Returns: - First notification found - - Raises: - TimeoutError: If no notification appears within timeout - AssertionError: If API returns unexpected status code - """ - deadline = asyncio.get_event_loop().time() + timeout - - while asyncio.get_event_loop().time() < deadline: - response = await client.get("/api/v1/notifications", params={"limit": 10}) - assert response.status_code == 200, f"Unexpected: {response.status_code} - {response.text}" - - result = NotificationListResponse.model_validate(response.json()) - if result.notifications: - return result.notifications[0] - - await asyncio.sleep(poll_interval) - - raise TimeoutError(f"No notification appeared within {timeout}s") - - class TestGetNotifications: """Tests for GET /api/v1/notifications.""" @@ -61,8 +25,8 @@ async def test_get_notifications_empty(self, test_user: AsyncClient) -> None: assert response.status_code == 200 result = NotificationListResponse.model_validate(response.json()) - assert result.total >= 0 - assert result.unread_count >= 0 + assert result.total == 0 + assert result.unread_count == 0 assert isinstance(result.notifications, list) @pytest.mark.asyncio @@ -92,6 +56,8 @@ async def test_get_notifications_with_status_filter( assert response.status_code == 200 result = NotificationListResponse.model_validate(response.json()) assert isinstance(result.notifications, list) + for n in result.notifications: + assert n.status == NotificationStatus.DELIVERED @pytest.mark.asyncio async def test_get_notifications_with_tag_filters( @@ -109,6 +75,8 @@ async def test_get_notifications_with_tag_filters( assert response.status_code == 200 result = NotificationListResponse.model_validate(response.json()) assert isinstance(result.notifications, list) + for n in result.notifications: + assert any(t.startswith("exec") for t in n.tags) @pytest.mark.asyncio async def test_get_notifications_unauthenticated( @@ -136,10 +104,10 @@ async def test_mark_nonexistent_notification_read( async def test_mark_notification_read( self, test_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], ) -> None: """Mark existing notification as read.""" - notification = await wait_for_notification(test_user) + _, notification = execution_with_notification response = await test_user.put( f"/api/v1/notifications/{notification.notification_id}/read" @@ -147,17 +115,35 @@ async def test_mark_notification_read( assert response.status_code == 204 + # Verify state actually changed + get_resp = await test_user.get("/api/v1/notifications") + assert get_resp.status_code == 200 + result = NotificationListResponse.model_validate(get_resp.json()) + marked = [n for n in result.notifications if n.notification_id == notification.notification_id] + assert len(marked) == 1 + assert marked[0].read_at is not None + class TestMarkAllRead: """Tests for POST /api/v1/notifications/mark-all-read.""" @pytest.mark.asyncio - async def test_mark_all_read(self, test_user: AsyncClient) -> None: + async def test_mark_all_read( + self, + test_user: AsyncClient, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], + ) -> None: """Mark all notifications as read returns 204.""" response = await test_user.post("/api/v1/notifications/mark-all-read") assert response.status_code == 204 + # Verify unread count is now zero + count_resp = await test_user.get("/api/v1/notifications/unread-count") + assert count_resp.status_code == 200 + count_result = UnreadCountResponse.model_validate(count_resp.json()) + assert count_result.unread_count == 0 + @pytest.mark.asyncio async def test_mark_all_read_idempotent( self, test_user: AsyncClient @@ -183,7 +169,7 @@ async def test_get_subscriptions(self, test_user: AsyncClient) -> None: assert isinstance(result.subscriptions, list) for sub in result.subscriptions: - assert sub.channel is not None + assert sub.channel in list(NotificationChannel) @pytest.mark.asyncio async def test_update_subscription(self, test_user: AsyncClient) -> None: @@ -201,6 +187,8 @@ async def test_update_subscription(self, test_user: AsyncClient) -> None: assert result.enabled is True assert result.channel == NotificationChannel.IN_APP + expected_severities = {NotificationSeverity.LOW, NotificationSeverity.MEDIUM, NotificationSeverity.HIGH} + assert set(result.severities) == expected_severities @pytest.mark.asyncio async def test_update_subscription_disable( @@ -233,21 +221,26 @@ async def test_update_subscription_with_tags( assert response.status_code == 200 result = NotificationSubscription.model_validate(response.json()) assert result.enabled is True + assert result.include_tags == ["execution", "system"] + assert result.exclude_tags == ["debug"] class TestUnreadCount: """Tests for GET /api/v1/notifications/unread-count.""" @pytest.mark.asyncio - async def test_get_unread_count(self, test_user: AsyncClient) -> None: + async def test_get_unread_count( + self, + test_user: AsyncClient, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], + ) -> None: """Get unread notification count.""" response = await test_user.get("/api/v1/notifications/unread-count") assert response.status_code == 200 result = UnreadCountResponse.model_validate(response.json()) - assert result.unread_count >= 0 - assert isinstance(result.unread_count, int) + assert result.unread_count >= 1 class TestDeleteNotification: @@ -268,10 +261,10 @@ async def test_delete_nonexistent_notification( async def test_delete_notification( self, test_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], ) -> None: """Delete existing notification returns success.""" - notification = await wait_for_notification(test_user) + _, notification = execution_with_notification response = await test_user.delete( f"/api/v1/notifications/{notification.notification_id}" @@ -281,6 +274,13 @@ async def test_delete_notification( result = DeleteNotificationResponse.model_validate(response.json()) assert "deleted" in result.message.lower() + # Verify notification is actually gone + get_resp = await test_user.get("/api/v1/notifications") + assert get_resp.status_code == 200 + remaining = NotificationListResponse.model_validate(get_resp.json()) + remaining_ids = [n.notification_id for n in remaining.notifications] + assert notification.notification_id not in remaining_ids + class TestNotificationIsolation: """Tests for notification access isolation between users.""" @@ -290,10 +290,10 @@ async def test_user_cannot_see_other_users_notifications( self, test_user: AsyncClient, another_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], ) -> None: """User's notification list does not include other users' notifications.""" - notification = await wait_for_notification(test_user) + _, notification = execution_with_notification response = await another_user.get("/api/v1/notifications") assert response.status_code == 200 @@ -308,10 +308,10 @@ async def test_cannot_mark_other_users_notification_read( self, test_user: AsyncClient, another_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], ) -> None: """Cannot mark another user's notification as read.""" - notification = await wait_for_notification(test_user) + _, notification = execution_with_notification response = await another_user.put( f"/api/v1/notifications/{notification.notification_id}/read" @@ -319,18 +319,30 @@ async def test_cannot_mark_other_users_notification_read( assert response.status_code == 404 + # Verify owner CAN mark it + owner_resp = await test_user.put( + f"/api/v1/notifications/{notification.notification_id}/read" + ) + assert owner_resp.status_code == 204 + @pytest.mark.asyncio async def test_cannot_delete_other_users_notification( self, test_user: AsyncClient, another_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_notification: tuple[ExecutionResponse, NotificationResponse], ) -> None: """Cannot delete another user's notification.""" - notification = await wait_for_notification(test_user) + _, notification = execution_with_notification response = await another_user.delete( f"/api/v1/notifications/{notification.notification_id}" ) assert response.status_code == 404 + + # Verify owner CAN delete it + owner_resp = await test_user.delete( + f"/api/v1/notifications/{notification.notification_id}" + ) + assert owner_resp.status_code == 200 diff --git a/backend/tests/e2e/test_replay_routes.py b/backend/tests/e2e/test_replay_routes.py index a6bbb2ff..e19be9df 100644 --- a/backend/tests/e2e/test_replay_routes.py +++ b/backend/tests/e2e/test_replay_routes.py @@ -39,10 +39,9 @@ async def test_create_replay_session( assert response.status_code == 200 result = ReplayResponse.model_validate(response.json()) - assert result.session_id is not None + assert result.session_id # Newly created session has CREATED status assert result.status == ReplayStatus.CREATED - assert result.message is not None @pytest.mark.asyncio async def test_create_replay_session_with_filter( @@ -62,7 +61,7 @@ async def test_create_replay_session_with_filter( assert response.status_code == 200 result = ReplayResponse.model_validate(response.json()) - assert result.session_id is not None + assert result.session_id @pytest.mark.asyncio async def test_create_replay_session_file_target( @@ -81,7 +80,7 @@ async def test_create_replay_session_file_target( assert response.status_code == 200 result = ReplayResponse.model_validate(response.json()) - assert result.session_id is not None + assert result.session_id @pytest.mark.asyncio async def test_create_replay_session_forbidden_for_regular_user( diff --git a/backend/tests/e2e/test_saga_routes.py b/backend/tests/e2e/test_saga_routes.py index e4cd6378..c0767f6b 100644 --- a/backend/tests/e2e/test_saga_routes.py +++ b/backend/tests/e2e/test_saga_routes.py @@ -1,5 +1,3 @@ -import asyncio - import pytest from app.domain.enums.saga import SagaState from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse @@ -10,43 +8,9 @@ ) from httpx import AsyncClient -pytestmark = [pytest.mark.e2e, pytest.mark.kafka] - - -async def wait_for_saga( - client: AsyncClient, - execution_id: str, - timeout: float = 30.0, - poll_interval: float = 0.5, -) -> SagaStatusResponse: - """Poll until at least one saga exists for the execution. - - Args: - client: Authenticated HTTP client - execution_id: ID of execution to get saga for - timeout: Maximum time to wait in seconds - poll_interval: Time between polls in seconds - - Returns: - First saga for the execution +from tests.e2e.conftest import EventWaiter - Raises: - TimeoutError: If no saga appears within timeout - AssertionError: If API returns unexpected status code - """ - deadline = asyncio.get_event_loop().time() + timeout - - while asyncio.get_event_loop().time() < deadline: - response = await client.get(f"/api/v1/sagas/execution/{execution_id}") - assert response.status_code == 200, f"Unexpected: {response.status_code} - {response.text}" - - result = SagaListResponse.model_validate(response.json()) - if result.sagas: - return result.sagas[0] - - await asyncio.sleep(poll_interval) - - raise TimeoutError(f"No saga appeared for execution {execution_id} within {timeout}s") +pytestmark = [pytest.mark.e2e, pytest.mark.kafka] class TestGetSagaStatus: @@ -54,22 +18,22 @@ class TestGetSagaStatus: @pytest.mark.asyncio async def test_get_saga_status( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """Get saga status by ID returns valid response.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + execution, saga = execution_with_saga response = await test_user.get(f"/api/v1/sagas/{saga.saga_id}") assert response.status_code == 200 result = SagaStatusResponse.model_validate(response.json()) assert result.saga_id == saga.saga_id - assert result.execution_id == created_execution.execution_id - assert result.state in list(SagaState) - assert result.saga_name is not None - assert result.created_at is not None - assert result.updated_at is not None + assert result.execution_id == execution.execution_id + assert result.state in {SagaState.CREATED, SagaState.RUNNING} + assert result.saga_name assert result.retry_count >= 0 + assert result.error_message is None + assert result.completed_steps is not None @pytest.mark.asyncio async def test_get_saga_not_found(self, test_user: AsyncClient) -> None: @@ -83,10 +47,10 @@ async def test_get_saga_access_denied( self, test_user: AsyncClient, another_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse], ) -> None: """Cannot access another user's saga.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + _, saga = execution_with_saga response = await another_user.get(f"/api/v1/sagas/{saga.saga_id}") @@ -98,20 +62,20 @@ class TestGetExecutionSagas: @pytest.mark.asyncio async def test_get_execution_sagas( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """Get sagas for a specific execution.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + execution, saga = execution_with_saga response = await test_user.get( - f"/api/v1/sagas/execution/{created_execution.execution_id}" + f"/api/v1/sagas/execution/{execution.execution_id}" ) assert response.status_code == 200 result = SagaListResponse.model_validate(response.json()) - assert result.total >= 1 - assert len(result.sagas) >= 1 + assert result.total == 1 + assert len(result.sagas) == 1 assert isinstance(result.has_more, bool) saga_ids = [s.saga_id for s in result.sagas] @@ -119,13 +83,13 @@ async def test_get_execution_sagas( @pytest.mark.asyncio async def test_get_execution_sagas_with_pagination( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """Pagination works for execution sagas.""" - await wait_for_saga(test_user, created_execution.execution_id) + execution, _ = execution_with_saga response = await test_user.get( - f"/api/v1/sagas/execution/{created_execution.execution_id}", + f"/api/v1/sagas/execution/{execution.execution_id}", params={"limit": 5, "skip": 0}, ) @@ -133,22 +97,23 @@ async def test_get_execution_sagas_with_pagination( result = SagaListResponse.model_validate(response.json()) assert result.limit == 5 assert result.skip == 0 + assert len(result.sagas) <= 5 @pytest.mark.asyncio async def test_get_execution_sagas_with_state_filter( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """Filter sagas by state.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + execution, saga = execution_with_saga response = await test_user.get( - f"/api/v1/sagas/execution/{created_execution.execution_id}", + f"/api/v1/sagas/execution/{execution.execution_id}", params={"state": saga.state.value}, ) assert response.status_code == 200 result = SagaListResponse.model_validate(response.json()) - assert len(result.sagas) >= 1 + assert len(result.sagas) == 1 for s in result.sagas: assert s.state == saga.state @@ -158,10 +123,10 @@ class TestListSagas: @pytest.mark.asyncio async def test_list_sagas( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """List sagas for current user.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + _, saga = execution_with_saga response = await test_user.get("/api/v1/sagas/") @@ -170,16 +135,17 @@ async def test_list_sagas( assert result.total >= 1 assert len(result.sagas) >= 1 + assert len(result.sagas) <= result.limit saga_ids = [s.saga_id for s in result.sagas] assert saga.saga_id in saga_ids @pytest.mark.asyncio async def test_list_sagas_with_state_filter( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """Filter sagas by state.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + _, saga = execution_with_saga response = await test_user.get( "/api/v1/sagas/", @@ -189,16 +155,15 @@ async def test_list_sagas_with_state_filter( assert response.status_code == 200 result = SagaListResponse.model_validate(response.json()) + assert len(result.sagas) >= 1 for s in result.sagas: assert s.state == saga.state @pytest.mark.asyncio async def test_list_sagas_pagination( - self, test_user: AsyncClient, created_execution: ExecutionResponse + self, test_user: AsyncClient, execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse] ) -> None: """Pagination works for saga list.""" - await wait_for_saga(test_user, created_execution.execution_id) - response = await test_user.get( "/api/v1/sagas/", params={"limit": 10, "skip": 0}, @@ -226,6 +191,7 @@ class TestCancelSaga: async def test_cancel_saga( self, test_user: AsyncClient, + event_waiter: EventWaiter, long_running_execution_request: ExecutionRequest, ) -> None: """Cancel a running saga.""" @@ -235,16 +201,25 @@ async def test_cancel_saga( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) - saga = await wait_for_saga(test_user, execution.execution_id) + await event_waiter.wait_for_saga_command(execution.execution_id) + + saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") + saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] response = await test_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel") assert response.status_code == 200 result = SagaCancellationResponse.model_validate(response.json()) assert result.saga_id == saga.saga_id - assert isinstance(result.success, bool) + assert result.success is True assert result.message is not None + # Verify saga state actually changed + status_resp = await test_user.get(f"/api/v1/sagas/{saga.saga_id}") + assert status_resp.status_code == 200 + updated_saga = SagaStatusResponse.model_validate(status_resp.json()) + assert updated_saga.state in {SagaState.CANCELLED, SagaState.COMPENSATING} + @pytest.mark.asyncio async def test_cancel_nonexistent_saga( self, test_user: AsyncClient @@ -261,6 +236,7 @@ async def test_cancel_other_users_saga_forbidden( self, test_user: AsyncClient, another_user: AsyncClient, + event_waiter: EventWaiter, long_running_execution_request: ExecutionRequest, ) -> None: """Cannot cancel another user's saga.""" @@ -270,7 +246,10 @@ async def test_cancel_other_users_saga_forbidden( assert exec_response.status_code == 200 execution = ExecutionResponse.model_validate(exec_response.json()) - saga = await wait_for_saga(test_user, execution.execution_id) + await event_waiter.wait_for_saga_command(execution.execution_id) + + saga_resp = await test_user.get(f"/api/v1/sagas/execution/{execution.execution_id}") + saga = SagaListResponse.model_validate(saga_resp.json()).sagas[0] response = await another_user.post(f"/api/v1/sagas/{saga.saga_id}/cancel") @@ -285,15 +264,21 @@ async def test_user_cannot_see_other_users_sagas( self, test_user: AsyncClient, another_user: AsyncClient, - created_execution: ExecutionResponse, + execution_with_saga: tuple[ExecutionResponse, SagaStatusResponse], ) -> None: """User's saga list does not include other users' sagas.""" - saga = await wait_for_saga(test_user, created_execution.execution_id) + _, saga = execution_with_saga + assert saga.saga_id + + # Positive proof: owner CAN see the saga + owner_resp = await test_user.get("/api/v1/sagas/") + assert owner_resp.status_code == 200 + owner_result = SagaListResponse.model_validate(owner_resp.json()) + assert saga.saga_id in [s.saga_id for s in owner_result.sagas] + # Negative proof: another user CANNOT see it response = await another_user.get("/api/v1/sagas/") assert response.status_code == 200 result = SagaListResponse.model_validate(response.json()) - saga_ids = [s.saga_id for s in result.sagas] - - assert saga.saga_id not in saga_ids + assert saga.saga_id not in [s.saga_id for s in result.sagas] diff --git a/backend/tests/e2e/test_sse_routes.py b/backend/tests/e2e/test_sse_routes.py index 12fe8136..3c1e4e13 100644 --- a/backend/tests/e2e/test_sse_routes.py +++ b/backend/tests/e2e/test_sse_routes.py @@ -1,7 +1,12 @@ +import asyncio +import contextlib +import json +from dataclasses import dataclass from typing import Any import pytest import pytest_asyncio +from app.domain.enums.sse import SSEControlEvent from app.schemas_pydantic.execution import ExecutionResponse from async_asgi_testclient import TestClient as SSETestClient from fastapi import FastAPI @@ -10,6 +15,77 @@ pytestmark = [pytest.mark.e2e] + +@dataclass +class SSEEvent: + """Parsed SSE event.""" + + event: str = "message" + data: str = "" + id: str = "" + retry: int | None = None + + def json(self) -> Any: + return json.loads(self.data) + + +def _parse_sse_event(raw: str) -> SSEEvent | None: + """Parse a single SSE event block (text between double-newlines).""" + ev = SSEEvent() + data_lines: list[str] = [] + has_data = False + + for line in raw.split("\n"): + line = line.rstrip("\r") + if not line or line.startswith(":"): + continue + name, _, value = line.partition(":") + value = value.lstrip(" ") # SSE spec: strip one leading space + if name == "event": + ev.event = value + elif name == "data": + has_data = True + data_lines.append(value) + elif name == "id": + ev.id = value + elif name == "retry": + with contextlib.suppress(ValueError): + ev.retry = int(value) + + if not has_data: + return None + ev.data = "\n".join(data_lines) + return ev + + +async def collect_sse_events( + response: Any, + *, + max_events: int = 10, + timeout: float = 10.0, +) -> list[SSEEvent]: + """Read SSE events from an async-asgi-testclient streaming response. + + Stops after *max_events* data-bearing events or *timeout* seconds. + """ + events: list[SSEEvent] = [] + buf = "" + + async with asyncio.timeout(timeout): + async for chunk in response.iter_content(512): + buf += chunk.decode("utf-8") + while "\n\n" in buf: + block, buf = buf.split("\n\n", 1) + ev = _parse_sse_event(block) + if ev is not None: + events.append(ev) + if len(events) >= max_events: + break + + return events + + + class _NoLifespan: """ASGI wrapper that completes lifespan events immediately. @@ -32,6 +108,7 @@ async def __call__(self, scope: Any, receive: Any, send: Any) -> None: await self.app(scope, receive, send) + @pytest_asyncio.fixture async def sse_client(app: FastAPI, test_user: AsyncClient) -> SSETestClient: """SSE-capable test client with auth cookies from test_user. @@ -64,6 +141,7 @@ async def sse_client_another(app: FastAPI, another_user: AsyncClient) -> SSETest return client + class TestNotificationStream: """Tests for GET /api/v1/events/notifications/stream.""" @@ -71,7 +149,7 @@ class TestNotificationStream: async def test_notification_stream_returns_event_stream( self, sse_client: SSETestClient ) -> None: - """Notification stream returns SSE content type and streams data.""" + """Notification stream returns SSE content type.""" async with sse_client: response = await sse_client.get( "/api/v1/events/notifications/stream", stream=True @@ -96,7 +174,7 @@ class TestExecutionStream: async def test_execution_stream_returns_event_stream( self, sse_client: SSETestClient, created_execution: ExecutionResponse ) -> None: - """Execution events stream returns SSE content type.""" + """Execution stream returns SSE content type.""" async with sse_client: response = await sse_client.get( f"/api/v1/events/executions/{created_execution.execution_id}", @@ -106,6 +184,42 @@ async def test_execution_stream_returns_event_stream( assert response.status_code == 200 assert "text/event-stream" in response.headers.get("content-type", "") + @pytest.mark.asyncio + async def test_execution_stream_yields_control_events( + self, sse_client: SSETestClient, created_execution: ExecutionResponse + ) -> None: + """Execution stream yields connected/subscribed/status control events.""" + async with sse_client: + response = await sse_client.get( + f"/api/v1/events/executions/{created_execution.execution_id}", + stream=True, + ) + assert response.status_code == 200 + + # The server yields connected + subscribed immediately, then status + # after a DB lookup. Collect up to 3 events with a generous timeout. + events = await collect_sse_events(response, max_events=3, timeout=10.0) + + assert len(events) >= 2, f"Expected >= 2 control events, got {len(events)}" + + # First event: connected + connected = events[0].json() + assert connected["event_type"] == SSEControlEvent.CONNECTED + assert connected["execution_id"] == created_execution.execution_id + + # Second event: subscribed + subscribed = events[1].json() + assert subscribed["event_type"] == SSEControlEvent.SUBSCRIBED + assert subscribed["execution_id"] == created_execution.execution_id + assert subscribed["message"] == "Redis subscription established" + + # Third event (if present): status with current execution state + if len(events) >= 3: + status_ev = events[2].json() + assert status_ev["event_type"] == SSEControlEvent.STATUS + assert status_ev["execution_id"] == created_execution.execution_id + assert status_ev["status"] is not None + @pytest.mark.asyncio async def test_execution_stream_unauthenticated( self, client: AsyncClient @@ -120,11 +234,10 @@ async def test_execution_stream_other_users_execution( sse_client_another: SSETestClient, created_execution: ExecutionResponse, ) -> None: - """Streaming another user's execution opens but events are filtered. + """Another user's execution stream still opens (auth at event level). - SSE endpoints return 200 and start streaming - authorization - happens at event level (user won't receive events for executions - they don't own). We verify the stream opens with correct content-type. + SSE endpoints return 200 and start streaming. The connected/subscribed + control events are always sent; business events are filtered by ownership. """ async with sse_client_another: response = await sse_client_another.get( @@ -134,3 +247,9 @@ async def test_execution_stream_other_users_execution( assert response.status_code == 200 assert "text/event-stream" in response.headers.get("content-type", "") + + # Even another user receives control events (connected, subscribed) + events = await collect_sse_events(response, max_events=2, timeout=10.0) + assert len(events) >= 2 + assert events[0].json()["event_type"] == SSEControlEvent.CONNECTED + assert events[1].json()["event_type"] == SSEControlEvent.SUBSCRIBED diff --git a/backend/tests/e2e/test_user_settings_routes.py b/backend/tests/e2e/test_user_settings_routes.py index 12a3424c..46225bd9 100644 --- a/backend/tests/e2e/test_user_settings_routes.py +++ b/backend/tests/e2e/test_user_settings_routes.py @@ -25,9 +25,8 @@ async def test_get_user_settings(self, test_user: AsyncClient) -> None: assert response.status_code == 200 settings = UserSettings.model_validate(response.json()) - assert settings.theme in [Theme.LIGHT, Theme.DARK, Theme.AUTO] - assert settings.timezone is not None - assert settings.notifications is not None + assert settings.theme in list(Theme) + assert settings.timezone assert settings.editor is not None @pytest.mark.asyncio @@ -94,35 +93,12 @@ class TestUpdateTheme: """Tests for PUT /api/v1/user/settings/theme.""" @pytest.mark.asyncio - async def test_update_theme_dark(self, test_user: AsyncClient) -> None: - """Update theme to dark.""" - request = ThemeUpdateRequest(theme=Theme.DARK) - response = await test_user.put( - "/api/v1/user/settings/theme", - json=request.model_dump(), - ) - - assert response.status_code == 200 - settings = UserSettings.model_validate(response.json()) - assert settings.theme == Theme.DARK - - @pytest.mark.asyncio - async def test_update_theme_light(self, test_user: AsyncClient) -> None: - """Update theme to light.""" - request = ThemeUpdateRequest(theme=Theme.LIGHT) - response = await test_user.put( - "/api/v1/user/settings/theme", - json=request.model_dump(), - ) - - assert response.status_code == 200 - settings = UserSettings.model_validate(response.json()) - assert settings.theme == Theme.LIGHT - - @pytest.mark.asyncio - async def test_update_theme_system(self, test_user: AsyncClient) -> None: - """Update theme to system.""" - request = ThemeUpdateRequest(theme=Theme.AUTO) + @pytest.mark.parametrize("theme", list(Theme), ids=lambda t: t.value) + async def test_update_theme( + self, test_user: AsyncClient, theme: Theme + ) -> None: + """Update theme to each valid value.""" + request = ThemeUpdateRequest(theme=theme) response = await test_user.put( "/api/v1/user/settings/theme", json=request.model_dump(), @@ -130,7 +106,7 @@ async def test_update_theme_system(self, test_user: AsyncClient) -> None: assert response.status_code == 200 settings = UserSettings.model_validate(response.json()) - assert settings.theme == Theme.AUTO + assert settings.theme == theme class TestUpdateNotificationSettings: @@ -154,7 +130,10 @@ async def test_update_notification_settings( assert response.status_code == 200 settings = UserSettings.model_validate(response.json()) - assert settings.notifications is not None + assert settings.notifications.execution_completed is True + assert settings.notifications.execution_failed is True + assert settings.notifications.system_updates is True + assert settings.notifications.security_alerts is True class TestUpdateEditorSettings: @@ -219,7 +198,6 @@ async def test_get_settings_history(self, test_user: AsyncClient) -> None: assert response.status_code == 200 history = SettingsHistoryResponse.model_validate(response.json()) assert history.limit == 10 - assert isinstance(history.history, list) assert len(history.history) >= 1 @pytest.mark.asyncio @@ -230,8 +208,7 @@ async def test_get_settings_history_default_limit( response = await test_user.get("/api/v1/user/settings/history") assert response.status_code == 200 - history = SettingsHistoryResponse.model_validate(response.json()) - assert isinstance(history.history, list) + SettingsHistoryResponse.model_validate(response.json()) class TestRestoreSettings: @@ -264,7 +241,7 @@ async def test_restore_settings(self, test_user: AsyncClient) -> None: assert restore_response.status_code == 200 restored = UserSettings.model_validate(restore_response.json()) - assert restored.theme is not None + assert restored.theme in list(Theme) class TestCustomSettings: From 4e5cb6ea2dd6cb563a450f3053e75edb81063fa6 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Sat, 7 Feb 2026 00:14:51 +0100 Subject: [PATCH 2/3] fix: tests --- backend/tests/e2e/test_dlq_routes.py | 93 ++++++----- backend/tests/e2e/test_events_routes.py | 2 +- backend/tests/e2e/test_sse_routes.py | 195 ++++++++---------------- 3 files changed, 108 insertions(+), 182 deletions(-) diff --git a/backend/tests/e2e/test_dlq_routes.py b/backend/tests/e2e/test_dlq_routes.py index 42f8aa89..bd396c59 100644 --- a/backend/tests/e2e/test_dlq_routes.py +++ b/backend/tests/e2e/test_dlq_routes.py @@ -1,4 +1,6 @@ import pytest +import pytest_asyncio +from app.db.docs.dlq import DLQMessageDocument from app.dlq.models import DLQMessageStatus, RetryStrategy from app.domain.enums.events import EventType from app.schemas_pydantic.dlq import ( @@ -11,9 +13,27 @@ from app.schemas_pydantic.user import MessageResponse from httpx import AsyncClient +from tests.conftest import make_execution_requested_event + pytestmark = [pytest.mark.e2e, pytest.mark.kafka] +@pytest_asyncio.fixture +async def stored_dlq_message() -> DLQMessageDocument: + """Insert a DLQ message directly into MongoDB and return it.""" + event = make_execution_requested_event() + doc = DLQMessageDocument( + event=event, + original_topic="execution-events", + error="Simulated failure for E2E testing", + retry_count=0, + status=DLQMessageStatus.PENDING, + producer_id="e2e-test", + ) + await doc.insert() + return doc + + class TestGetDLQStats: """Tests for GET /api/v1/dlq/stats.""" @@ -124,51 +144,31 @@ async def test_get_dlq_message_not_found( @pytest.mark.asyncio async def test_get_dlq_message_detail( - self, test_user: AsyncClient + self, test_user: AsyncClient, stored_dlq_message: DLQMessageDocument ) -> None: - """Get DLQ message detail if messages exist.""" - # First list messages to find one - list_response = await test_user.get( - "/api/v1/dlq/messages", - params={"limit": 1}, - ) - assert list_response.status_code == 200 - result = DLQMessagesResponse.model_validate(list_response.json()) - - if not result.messages: - pytest.skip("No DLQ messages available to test detail endpoint") - - event_id = result.messages[0].event.event_id + """Get DLQ message detail by event_id.""" + event_id = stored_dlq_message.event.event_id response = await test_user.get( f"/api/v1/dlq/messages/{event_id}" ) assert response.status_code == 200 detail = DLQMessageDetail.model_validate(response.json()) - assert detail.event is not None - assert detail.original_topic - assert detail.error - assert detail.retry_count >= 0 + assert detail.event.event_id == event_id + assert detail.original_topic == "execution-events" + assert detail.error == "Simulated failure for E2E testing" + assert detail.retry_count == 0 class TestRetryDLQMessages: """Tests for POST /api/v1/dlq/retry.""" @pytest.mark.asyncio - async def test_retry_dlq_messages(self, test_user: AsyncClient) -> None: - """Retry DLQ messages.""" - # First list messages to find some - list_response = await test_user.get( - "/api/v1/dlq/messages", - params={"status": DLQMessageStatus.PENDING, "limit": 5}, - ) - assert list_response.status_code == 200 - result = DLQMessagesResponse.model_validate(list_response.json()) - - if not result.messages: - pytest.skip("No DLQ messages available to test retry") - - event_ids = [msg.event.event_id for msg in result.messages[:2]] + async def test_retry_dlq_messages( + self, test_user: AsyncClient, stored_dlq_message: DLQMessageDocument + ) -> None: + """Retry a known DLQ message.""" + event_ids = [stored_dlq_message.event.event_id] response = await test_user.post( "/api/v1/dlq/retry", @@ -179,8 +179,8 @@ async def test_retry_dlq_messages(self, test_user: AsyncClient) -> None: response.json() ) - assert retry_result.total == len(event_ids) - assert retry_result.successful + retry_result.failed == retry_result.total + assert retry_result.total == 1 + assert retry_result.successful + retry_result.failed == 1 @pytest.mark.asyncio async def test_retry_dlq_messages_empty_list( @@ -262,20 +262,11 @@ async def test_discard_dlq_message_not_found( assert response.status_code == 404 @pytest.mark.asyncio - async def test_discard_dlq_message(self, test_user: AsyncClient) -> None: - """Discard a DLQ message if messages exist.""" - # First list messages to find one - list_response = await test_user.get( - "/api/v1/dlq/messages", - params={"limit": 1}, - ) - assert list_response.status_code == 200 - result = DLQMessagesResponse.model_validate(list_response.json()) - - if not result.messages: - pytest.skip("No DLQ messages available to test discard") - - event_id = result.messages[0].event.event_id + async def test_discard_dlq_message( + self, test_user: AsyncClient, stored_dlq_message: DLQMessageDocument + ) -> None: + """Discard a known DLQ message.""" + event_id = stored_dlq_message.event.event_id response = await test_user.delete( f"/api/v1/dlq/messages/{event_id}", @@ -288,6 +279,12 @@ async def test_discard_dlq_message(self, test_user: AsyncClient) -> None: assert event_id in msg_result.message assert "discarded" in msg_result.message.lower() + # Verify message is actually gone or marked discarded + get_resp = await test_user.get(f"/api/v1/dlq/messages/{event_id}") + if get_resp.status_code == 200: + detail = DLQMessageDetail.model_validate(get_resp.json()) + assert detail.status == DLQMessageStatus.DISCARDED + @pytest.mark.asyncio async def test_discard_dlq_message_requires_reason( self, test_user: AsyncClient diff --git a/backend/tests/e2e/test_events_routes.py b/backend/tests/e2e/test_events_routes.py index e851a705..a8c5a7bc 100644 --- a/backend/tests/e2e/test_events_routes.py +++ b/backend/tests/e2e/test_events_routes.py @@ -89,7 +89,7 @@ async def test_get_user_events( result = EventListResponse.model_validate(response.json()) assert result.total >= 1 assert len(result.events) >= 1 - assert len(result.events) == min(result.total, 10) + assert len(result.events) <= min(result.total, 10) @pytest.mark.asyncio async def test_get_user_events_with_filters( diff --git a/backend/tests/e2e/test_sse_routes.py b/backend/tests/e2e/test_sse_routes.py index 3c1e4e13..e105cee2 100644 --- a/backend/tests/e2e/test_sse_routes.py +++ b/backend/tests/e2e/test_sse_routes.py @@ -1,89 +1,16 @@ import asyncio -import contextlib -import json -from dataclasses import dataclass from typing import Any import pytest import pytest_asyncio -from app.domain.enums.sse import SSEControlEvent -from app.schemas_pydantic.execution import ExecutionResponse +from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse from async_asgi_testclient import TestClient as SSETestClient from fastapi import FastAPI from httpx import AsyncClient -pytestmark = [pytest.mark.e2e] - - - -@dataclass -class SSEEvent: - """Parsed SSE event.""" - - event: str = "message" - data: str = "" - id: str = "" - retry: int | None = None - - def json(self) -> Any: - return json.loads(self.data) - - -def _parse_sse_event(raw: str) -> SSEEvent | None: - """Parse a single SSE event block (text between double-newlines).""" - ev = SSEEvent() - data_lines: list[str] = [] - has_data = False - - for line in raw.split("\n"): - line = line.rstrip("\r") - if not line or line.startswith(":"): - continue - name, _, value = line.partition(":") - value = value.lstrip(" ") # SSE spec: strip one leading space - if name == "event": - ev.event = value - elif name == "data": - has_data = True - data_lines.append(value) - elif name == "id": - ev.id = value - elif name == "retry": - with contextlib.suppress(ValueError): - ev.retry = int(value) - - if not has_data: - return None - ev.data = "\n".join(data_lines) - return ev - - -async def collect_sse_events( - response: Any, - *, - max_events: int = 10, - timeout: float = 10.0, -) -> list[SSEEvent]: - """Read SSE events from an async-asgi-testclient streaming response. - - Stops after *max_events* data-bearing events or *timeout* seconds. - """ - events: list[SSEEvent] = [] - buf = "" - - async with asyncio.timeout(timeout): - async for chunk in response.iter_content(512): - buf += chunk.decode("utf-8") - while "\n\n" in buf: - block, buf = buf.split("\n\n", 1) - ev = _parse_sse_event(block) - if ev is not None: - events.append(ev) - if len(events) >= max_events: - break - - return events +from tests.e2e.conftest import EventWaiter +pytestmark = [pytest.mark.e2e] class _NoLifespan: @@ -108,7 +35,6 @@ async def __call__(self, scope: Any, receive: Any, send: Any) -> None: await self.app(scope, receive, send) - @pytest_asyncio.fixture async def sse_client(app: FastAPI, test_user: AsyncClient) -> SSETestClient: """SSE-capable test client with auth cookies from test_user. @@ -121,10 +47,8 @@ async def sse_client(app: FastAPI, test_user: AsyncClient) -> SSETestClient: context manager from closing the session-scoped Kafka broker. """ client = SSETestClient(_NoLifespan(app)) - # Copy auth cookies from httpx client (SimpleCookie uses dict-style assignment) for name, value in test_user.cookies.items(): client.cookie_jar[name] = value - # Copy CSRF header if csrf := test_user.headers.get("X-CSRF-Token"): client.headers["X-CSRF-Token"] = csrf return client @@ -141,23 +65,9 @@ async def sse_client_another(app: FastAPI, another_user: AsyncClient) -> SSETest return client - class TestNotificationStream: """Tests for GET /api/v1/events/notifications/stream.""" - @pytest.mark.asyncio - async def test_notification_stream_returns_event_stream( - self, sse_client: SSETestClient - ) -> None: - """Notification stream returns SSE content type.""" - async with sse_client: - response = await sse_client.get( - "/api/v1/events/notifications/stream", stream=True - ) - - assert response.status_code == 200 - assert "text/event-stream" in response.headers.get("content-type", "") - @pytest.mark.asyncio async def test_notification_stream_unauthenticated( self, client: AsyncClient @@ -166,59 +76,80 @@ async def test_notification_stream_unauthenticated( response = await client.get("/api/v1/events/notifications/stream") assert response.status_code == 401 - -class TestExecutionStream: - """Tests for GET /api/v1/events/executions/{execution_id}.""" - @pytest.mark.asyncio - async def test_execution_stream_returns_event_stream( - self, sse_client: SSETestClient, created_execution: ExecutionResponse + @pytest.mark.kafka + async def test_notification_stream_returns_event_stream( + self, + sse_client: SSETestClient, + test_user: AsyncClient, + simple_execution_request: ExecutionRequest, + event_waiter: EventWaiter, ) -> None: - """Execution stream returns SSE content type.""" + """Notification stream returns SSE content type when a notification arrives. + + The notification stream has no initial control events (unlike the + execution stream). async-asgi-testclient blocks until the first + http.response.body ASGI message. We trigger a real notification by + creating an execution and waiting for its result — the notification + handler publishes to Redis before RESULT_STORED, unblocking the stream. + """ async with sse_client: - response = await sse_client.get( - f"/api/v1/events/executions/{created_execution.execution_id}", - stream=True, + # Start stream in background — blocks until first body chunk + stream_task = asyncio.create_task( + sse_client.get( + "/api/v1/events/notifications/stream", stream=True + ) + ) + # Allow Redis subscription to establish + await asyncio.sleep(0.5) + + # Trigger a notification: execution → result → notification + resp = await test_user.post( + "/api/v1/execute", + json=simple_execution_request.model_dump(), ) + assert resp.status_code == 200 + execution = ExecutionResponse.model_validate(resp.json()) + await event_waiter.wait_for_result(execution.execution_id) + + # Notification published to Redis unblocks the SSE stream + response = await asyncio.wait_for(stream_task, timeout=10.0) assert response.status_code == 200 assert "text/event-stream" in response.headers.get("content-type", "") + body = response.raw.read().decode("utf-8") + assert len(body) > 0 + + +class TestExecutionStream: + """Tests for GET /api/v1/events/executions/{execution_id}.""" @pytest.mark.asyncio - async def test_execution_stream_yields_control_events( + async def test_execution_stream_returns_event_stream( self, sse_client: SSETestClient, created_execution: ExecutionResponse ) -> None: - """Execution stream yields connected/subscribed/status control events.""" + """Execution stream returns SSE content type and first body chunk. + + async-asgi-testclient waits for the first http.response.body ASGI + message before returning the response object. For execution streams + this is the ``connected`` control event, confirming the SSE generator + started and yielded data. + """ async with sse_client: response = await sse_client.get( f"/api/v1/events/executions/{created_execution.execution_id}", stream=True, ) - assert response.status_code == 200 - - # The server yields connected + subscribed immediately, then status - # after a DB lookup. Collect up to 3 events with a generous timeout. - events = await collect_sse_events(response, max_events=3, timeout=10.0) - - assert len(events) >= 2, f"Expected >= 2 control events, got {len(events)}" - # First event: connected - connected = events[0].json() - assert connected["event_type"] == SSEControlEvent.CONNECTED - assert connected["execution_id"] == created_execution.execution_id - - # Second event: subscribed - subscribed = events[1].json() - assert subscribed["event_type"] == SSEControlEvent.SUBSCRIBED - assert subscribed["execution_id"] == created_execution.execution_id - assert subscribed["message"] == "Redis subscription established" + assert response.status_code == 200 + assert "text/event-stream" in response.headers.get("content-type", "") - # Third event (if present): status with current execution state - if len(events) >= 3: - status_ev = events[2].json() - assert status_ev["event_type"] == SSEControlEvent.STATUS - assert status_ev["execution_id"] == created_execution.execution_id - assert status_ev["status"] is not None + # The first body chunk (buffered by async-asgi-testclient during + # response construction) contains the ``connected`` SSE event. + first_chunk = response.raw.read() + body = first_chunk.decode("utf-8") + assert "connected" in body + assert created_execution.execution_id in body @pytest.mark.asyncio async def test_execution_stream_unauthenticated( @@ -248,8 +179,6 @@ async def test_execution_stream_other_users_execution( assert response.status_code == 200 assert "text/event-stream" in response.headers.get("content-type", "") - # Even another user receives control events (connected, subscribed) - events = await collect_sse_events(response, max_events=2, timeout=10.0) - assert len(events) >= 2 - assert events[0].json()["event_type"] == SSEControlEvent.CONNECTED - assert events[1].json()["event_type"] == SSEControlEvent.SUBSCRIBED + # Another user still receives the initial connected event + first_chunk = response.raw.read() + assert b"connected" in first_chunk From 443e33601ece4883c62c1f25b985db86abea97aa Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Sat, 7 Feb 2026 01:15:05 +0100 Subject: [PATCH 3/3] fix: tests --- .../app/services/grafana_alert_processor.py | 2 ++ backend/tests/e2e/conftest.py | 28 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/backend/app/services/grafana_alert_processor.py b/backend/app/services/grafana_alert_processor.py index a78d6d6c..2513cfbb 100644 --- a/backend/app/services/grafana_alert_processor.py +++ b/backend/app/services/grafana_alert_processor.py @@ -4,6 +4,7 @@ from typing import Any from app.domain.enums.notification import NotificationSeverity +from app.domain.enums.user import UserRole from app.schemas_pydantic.grafana import GrafanaAlertItem, GrafanaWebhook from app.services.notification_service import NotificationService @@ -99,6 +100,7 @@ async def process_single_alert( severity=severity, tags=["external_alert", "grafana", "entity:external_alert"], metadata=metadata, + target_roles=[UserRole.ADMIN], ) return True, None diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index b4d60cdb..0d76bac3 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -8,13 +8,14 @@ import pytest import pytest_asyncio from aiokafka import AIOKafkaConsumer +from app.db.docs.saga import SagaDocument from app.domain.enums.events import EventType from app.domain.enums.kafka import KafkaTopic from app.domain.enums.user import UserRole from app.domain.events.typed import DomainEvent, DomainEventAdapter from app.schemas_pydantic.execution import ExecutionRequest, ExecutionResponse from app.schemas_pydantic.notification import NotificationListResponse, NotificationResponse -from app.schemas_pydantic.saga import SagaListResponse, SagaStatusResponse +from app.schemas_pydantic.saga import SagaStatusResponse from app.schemas_pydantic.saved_script import SavedScriptCreateRequest from app.schemas_pydantic.user import UserCreate from app.settings import Settings @@ -217,18 +218,27 @@ async def created_execution_admin( @pytest_asyncio.fixture async def execution_with_saga( - test_user: AsyncClient, event_waiter: EventWaiter, created_execution: ExecutionResponse, ) -> tuple[ExecutionResponse, SagaStatusResponse]: - """Execution with saga guaranteed in MongoDB (via CREATE_POD_COMMAND event).""" + """Execution with saga guaranteed in MongoDB (via CREATE_POD_COMMAND event). + + The saga orchestrator persists the saga document multiple times before + publishing CREATE_POD_COMMAND to Kafka. Once EventWaiter resolves the + command, the document is definitively in MongoDB. We query Beanie + directly (same DB, no HTTP round-trip) for a deterministic, sleep-free + lookup. + """ await event_waiter.wait_for_saga_command(created_execution.execution_id) - resp = await test_user.get(f"/api/v1/sagas/execution/{created_execution.execution_id}") - assert resp.status_code == 200 - result = SagaListResponse.model_validate(resp.json()) - assert result.sagas, f"No saga for {created_execution.execution_id} despite CREATE_POD_COMMAND received" - assert result.sagas[0].execution_id == created_execution.execution_id - return created_execution, result.sagas[0] + + doc = await SagaDocument.find_one(SagaDocument.execution_id == created_execution.execution_id) + assert doc is not None, ( + f"No saga document for {created_execution.execution_id} despite CREATE_POD_COMMAND received" + ) + + saga = SagaStatusResponse.model_validate(doc, from_attributes=True) + assert saga.execution_id == created_execution.execution_id + return created_execution, saga @pytest_asyncio.fixture