fix: removed duplication, better tests, also added sse tests#143
fix: removed duplication, better tests, also added sse tests#143
Conversation
📝 WalkthroughWalkthroughAdds an async EventWaiter and session fixture for Kafka-driven synchronization in e2e tests, replaces polling helpers with event-driven fixtures (execution_with_saga, execution_with_notification), relaxes many strict runtime assertions across tests, removes some test modules, and introduces a target_roles change when creating system notifications. Changes
Sequence DiagramsequenceDiagram
autonumber
participant Test as Test Function
participant Fixture as event_waiter fixture
participant EventWaiter as EventWaiter
participant Kafka as Kafka Broker
participant API as FastAPI Server
Test->>Fixture: request event_waiter (session fixture)
Fixture->>EventWaiter: init consumer, subscribe topics
EventWaiter->>Kafka: connect & start consuming
Test->>API: POST /api/v1/executions (submit execution)
API-->>Kafka: publish execution-related events
Kafka-->>EventWaiter: deliver events (CREATE_POD_COMMAND, RESULT_STORED, RESULT_FAILED...)
EventWaiter->>EventWaiter: buffer & match predicate (execution_id)
EventWaiter-->>Test: resolve future / return matched event
Test->>API: GET /api/v1/executions/{id}/result or /api/v1/sagas/{id}
API-->>Test: return synchronized result/saga response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
2 issues found across 18 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/tests/e2e/test_health_routes.py">
<violation number="1" location="backend/tests/e2e/test_health_routes.py:67">
P3: The test description says the liveness check should respond within 1 second, but the updated assertion allows 5 seconds. This inconsistency weakens the test relative to its documented expectation.</violation>
</file>
<file name="backend/tests/e2e/test_saga_routes.py">
<violation number="1" location="backend/tests/e2e/test_saga_routes.py:32">
P2: This assertion is too strict and can fail when the saga reaches a terminal state quickly (COMPLETED/FAILED/etc.). Accept any valid SagaState like the previous test behavior to avoid flaky failures.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/tests/e2e/test_health_routes.py (1)
62-67:⚠️ Potential issue | 🟡 MinorDocstring says "within 1 second" but assertion checks
< 5.0.The docstring on line 63 is now inconsistent with the actual assertion. If 5s is the intended threshold, update the docstring to match.
Proposed fix
`@pytest.mark.asyncio` async def test_liveness_is_fast(self, client: AsyncClient) -> None: - """Liveness check should respond within 1 second.""" + """Liveness check should respond within 5 seconds.""" start = time.time() r = await client.get("/api/v1/health/live") assert r.status_code == 200 assert time.time() - start < 5.0
🤖 Fix all issues with AI agents
In `@backend/tests/e2e/conftest.py`:
- Around line 246-252: The test is fragile because it assumes the first
notification belongs to created_execution and also concatenates subject and tags
without a separator; change the lookup to search result.notifications for a
notification that contains created_execution.execution_id either in
notification.subject or in any item of notification.tags (use
event_waiter.wait_for_result as before to ensure delivery), and when checking a
combined check include an explicit separator between notification.subject and "
".join(notification.tags) so you don't get accidental boundary matches;
reference result.notifications, created_execution.execution_id,
notification.subject, and notification.tags to implement the robust filter and
match.
In `@backend/tests/e2e/test_saga_routes.py`:
- Around line 249-252: Before indexing into SagaListResponse.sagas, add an
assertion that the response contains at least one saga; specifically after
obtaining saga_resp via test_user.get and before the line using
SagaListResponse.model_validate(...).sagas[0], assert that
SagaListResponse.model_validate(saga_resp.json()).sagas or the parsed list has
length > 0 (or that saga_resp.json()["sagas"] is truthy) so the access to
sagas[0] in the saga assignment is guarded; ensure this check mirrors the guard
used in the earlier cancel test for consistency.
- Around line 206-207: Add an explicit guard before indexing into
SagaListResponse.sagas: after calling
SagaListResponse.model_validate(saga_resp.json()) (or directly inspecting
saga_resp.json()), assert that .sagas is non-empty and raise/pytest.assert with
a clear message referencing the execution.execution_id (or the saga_resp
content) so the test fails with a readable error instead of an IndexError; then
safely assign saga = ...sagas[0]. Also keep or mention wait_for_saga_command but
do not rely on it as the sole guarantee.
🧹 Nitpick comments (11)
backend/tests/e2e/test_admin_events_routes.py (2)
132-133: Validation without assignment discards the opportunity to assert field values.Other browse tests in this same class (e.g.,
test_browse_events,test_browse_events_with_pagination) assign the validated result and assert ontotal,skip,limit, etc. Dropping the assignment here means this test only checks that the response is deserializable — it doesn't verify any search-text-specific behaviour (e.g.,total >= 0, or that returned events actually match the search term).Consider keeping the assignment so at least a basic sanity assertion can follow:
Suggested change
- EventBrowseResponse.model_validate(response.json()) + result = EventBrowseResponse.model_validate(response.json()) + assert result.total >= 0
199-200: Same pattern: validated model discarded without field assertions.
test_get_event_statsandtest_get_event_stats_with_hoursboth assert ontotal_events/error_rateafter validation. Keeping parity here would be cheap and adds a small safety net.Suggested change
- EventStatsResponse.model_validate(response.json()) + stats = EventStatsResponse.model_validate(response.json()) + assert stats.total_events >= 0backend/tests/e2e/test_admin_users_routes.py (2)
64-75: Search test no longer validates that search actually filtered results.The
model_validatereturn value is discarded, so this test only confirms the endpoint returns a 200 with a valid shape. It doesn't verify that the search parameter had any effect (e.g., that returned users match the search term). Consider at least asserting something likeresult.total >= 0or checking that usernames contain the search term, similar totest_list_users_with_role_filterwhich validates the filter.Proposed improvement
- UserListResponse.model_validate(response.json()) + result = UserListResponse.model_validate(response.json()) + for user in result.users: + assert "test" in user.username.lower() or "test" in user.email.lower()
270-289: Add assertions to validate aggregation logic in overview response.The
AdminUserOverviewschema requiresstats,derived_counts, andrate_limit_summaryfields, all of which are populated by the service with aggregated data. The current test only validatesuser_idbut should assert that these aggregated fields are present and have expected values (e.g., stats is not null, derived_counts contains expected keys). This catches regressions in the overview aggregation logic for a freshly created user.backend/tests/e2e/dlq/test_dlq_manager.py (1)
100-100: Timeout reduced to 5s — verify this is reliable in CI.The
EventWaiter.wait_for(used elsewhere) defaults to 15s, while this directasyncio.wait_foris now 5s. Kafka consumer startup, partition assignment, and message delivery can be non-deterministic under load. If CI environments are slower, this could produce flaky test failures.backend/tests/e2e/conftest.py (1)
62-72: Unbounded buffer growth across the test session.
self._bufferis append-only and session-scoped, so every Kafka event consumed during the entire test run accumulates in memory. For typical test suites this is fine, but if the suite grows significantly this could become an issue. Consider adding a cap or clearing matched entries.backend/tests/e2e/test_events_routes.py (1)
332-333: Chainedandin assert gives opaque failure messages.If
_idis missing, the assertion error won't indicate which condition failed. Consider splitting into separate asserts for clearer diagnostics.♻️ Suggested improvement
for item in result: - assert "_id" in item and "count" in item and item["count"] > 0 + assert "_id" in item, f"Missing '_id' in aggregate item: {item}" + assert "count" in item, f"Missing 'count' in aggregate item: {item}" + assert item["count"] > 0, f"Expected count > 0, got {item['count']}"backend/tests/e2e/test_sse_routes.py (1)
61-85: Trailing buffer content is silently discarded.If the stream closes without a final
\n\n, any partial SSE block inbufis lost. This is generally fine per the SSE spec (events must be\n\n-terminated), but worth noting in case a server sends incomplete trailing data during tests.Also, if
asyncio.timeoutfires mid-iteration,TimeoutErrorpropagates uncaught — the caller gets an exception rather than the partialeventslist collected so far. Consider wrapping the collection in a try/except to return partial results on timeout if that's the intended behavior.♻️ Optional: return partial results on timeout
events: list[SSEEvent] = [] buf = "" - async with asyncio.timeout(timeout): - async for chunk in response.iter_content(512): - buf += chunk.decode("utf-8") - while "\n\n" in buf: - block, buf = buf.split("\n\n", 1) - ev = _parse_sse_event(block) - if ev is not None: - events.append(ev) - if len(events) >= max_events: - break + try: + async with asyncio.timeout(timeout): + async for chunk in response.iter_content(512): + buf += chunk.decode("utf-8") + while "\n\n" in buf: + block, buf = buf.split("\n\n", 1) + ev = _parse_sse_event(block) + if ev is not None: + events.append(ev) + if len(events) >= max_events: + break + except TimeoutError: + pass # return whatever we collected return eventsbackend/tests/e2e/test_execution_routes.py (2)
509-518: Parallel event waiting for concurrent executions — nice improvement.Using
asyncio.gatherto wait for all execution results simultaneously is much better than sequential polling. However, if one execution fails or times out, the gather will propagate the first exception and the other waits will be cancelled. Considerreturn_exceptions=Trueif you want to assert on each execution independently.
288-290: Loose assertion on retry failure response.
assert "detail" in retry_response.json()only checks the key exists. Consider also asserting the detail message content (e.g., "cannot retry") for better regression coverage, similar to line 240.backend/tests/e2e/test_notifications_routes.py (1)
170-172: Minor: unnecessarylist()conversion for enum membership check.
sub.channel in list(NotificationChannel)creates a list on every iteration.sub.channel in NotificationChannelworks directly with Python enums and is slightly more idiomatic.♻️ Suggested tweak
for sub in result.subscriptions: - assert sub.channel in list(NotificationChannel) + assert sub.channel in NotificationChannel
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/tests/e2e/test_sse_routes.py`:
- Around line 96-121: The test currently starts stream_task via
asyncio.create_task(sse_client.get(...)) and if the POST to /api/v1/execute or
event_waiter.wait_for_result raises, stream_task is left running; wrap the
section from creating stream_task through reading/validating the response in a
try/finally block and in the finally ensure the background coroutine is cleaned
up by checking if stream_task is not done, calling stream_task.cancel(), and
awaiting it (catch asyncio.CancelledError) to close the SSE connection; adjust
around the existing symbols stream_task, sse_client.get(...),
test_user.post(...), and event_waiter.wait_for_result(...) to guarantee no
dangling coroutine remains.
🧹 Nitpick comments (4)
backend/tests/e2e/test_events_routes.py (2)
332-333: Compound assertion gives an unhelpful failure message.If
"_id"is missing, pytest will reportAssertionErroron the whole expression without telling you which condition failed. Splitting into separate assertions (or using a descriptive message) makes debugging failures much easier.Suggested improvement
for item in result: - assert "_id" in item and "count" in item and item["count"] > 0 + assert "_id" in item, f"Missing '_id' in aggregate item: {item}" + assert "count" in item, f"Missing 'count' in aggregate item: {item}" + assert item["count"] > 0, f"Expected positive count: {item}"
428-429: Same compound-assertion readability concern.These
is not None and ...compound checks would give clearer diagnostics if split, especially since aNonevalue forevent_countvs an emptyevent_typeslist are very different failure modes worth distinguishing.Suggested improvement
- assert result.event_count is not None and result.event_count >= 1 - assert result.event_types is not None and len(result.event_types) >= 1 + assert result.event_count is not None + assert result.event_count >= 1 + assert result.event_types is not None + assert len(result.event_types) >= 1backend/tests/e2e/test_dlq_routes.py (2)
21-34: Consider adding teardown to clean up the inserted document.The fixture inserts a document but never removes it. If the e2e test database isn't wiped between runs, leftover documents could cause flaky assertions in tests that query all DLQ messages (e.g.,
test_get_dlq_messages_by_status,test_get_dlq_stats). Ayield-based cleanup would make this self-contained:♻️ Suggested cleanup
`@pytest_asyncio.fixture` async def stored_dlq_message() -> DLQMessageDocument: """Insert a DLQ message directly into MongoDB and return it.""" event = make_execution_requested_event() doc = DLQMessageDocument( event=event, original_topic="execution-events", error="Simulated failure for E2E testing", retry_count=0, status=DLQMessageStatus.PENDING, producer_id="e2e-test", ) await doc.insert() - return doc + yield doc + await doc.delete()
282-286: Tighten the verification: assert 404 if not 200.If the follow-up GET returns an unexpected status (e.g., 500), the test silently passes. Adding an explicit
elsebranch ensures only the two expected outcomes are accepted.♻️ Suggested tightening
get_resp = await test_user.get(f"/api/v1/dlq/messages/{event_id}") if get_resp.status_code == 200: detail = DLQMessageDetail.model_validate(get_resp.json()) assert detail.status == DLQMessageStatus.DISCARDED + else: + assert get_resp.status_code == 404
|



Summary by cubic
Converted E2E tests to event-driven with a shared Kafka EventWaiter and added stronger SSE coverage. Tests run faster and more reliably, with clearer expectations across routes.
Refactors
New Features
Written for commit 443e336. Summary will update on new commits.
Summary by CodeRabbit
New Features
Tests