Skip to content

fix: removed most of schema encode/decode overhead#135

Open
HardMax71 wants to merge 10 commits intomainfrom
fix/schema-manager
Open

fix: removed most of schema encode/decode overhead#135
HardMax71 wants to merge 10 commits intomainfrom
fix/schema-manager

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 4, 2026


Summary by cubic

Switched event transport from Avro/Schema Registry to FastStream’s native Pydantic JSON with 1 topic per event class, removed the EventStore/KafkaEventService, and replaced event_type with topic across API and DB. UnifiedProducer is now EventPublisher that persists idempotently; DLQ processes typed events and metadata via headers. CI pre-pulls the Python runtime image for reliable startup.

  • Performance

    • Dropped Schema Registry and custom decoders; handlers use typed Pydantic models directly, cutting serialization overhead.
    • Removed EventStore and eager schema init; EventPublisher persists before publish; Kubernetes worker no longer waits on the image pre-puller; CI pre-pulls python:3.11-slim.
    • DLQ now uses BaseEvent payloads with header metadata, simplifying parsing and retries.
  • Migration

    • Replace DomainEvent/DomainEventAdapter with BaseEvent; remove SchemaRegistryManager and related APIs/tests.
    • Rename UnifiedProducer to EventPublisher; delete KafkaEventService.
    • Replace event_type with topic everywhere: EventDocument.event_type -> topic, API params event_types -> topics, SSE uses topic strings.
    • Topics are derived via BaseEvent.topic(); remove mapping/registry helpers and EventType enum usage.
    • Add IdempotencyMiddleware for FastStream consumers.

Written for commit 20288b5. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Events moved to topic-based models and Avro/schema-registry paths removed; publishing simplified to a single publisher flow.
  • New Features

    • Events persisted idempotently before publish; DLQ/retry emits richer headers and trace info.
    • Idempotency middleware added to messaging pipeline.
    • Kubernetes worker pre-pulls runtime images; CI pre-pulls test runtime image.
  • Bug Fixes

    • Invalid events are skipped and counted; duplicate stores are no-ops.
  • Tests

    • Tests updated to topic/BaseEvent model; legacy schema-registry and event-store e2e tests removed.
  • Chores

    • Schema registry auth setting removed; tuned test K8s resource/timeouts.

@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

Replaces Avro/SchemaRegistry + UnifiedProducer + EventStore with a model-driven flow: BaseEvent + EventPublisher + EventRepository; removes schema registry and event_store modules; adds IdempotencyMiddleware; converts event-type enums to topic strings; updates DI providers, handlers, services, and tests to use topics and BaseEvent model validation (model_validate).

Changes

Cohort / File(s) Summary
Core providers & DI
backend/app/core/providers.py, backend/app/core/container.py
Rewire providers to inject EventPublisher, EventRepository, and IdempotencyMiddleware; remove schema-registry and EventStore wiring; update many provider signatures.
Event publish/persistence
backend/app/events/core/producer.py, backend/app/events/core/__init__.py
Rename UnifiedProducerEventPublisher; publishing now persists via EventRepository then publishes to broker; add publish/send_to_dlq APIs.
Schema registry & event store removed
backend/app/events/schema/schema_registry.py, backend/app/events/event_store.py, backend/app/events/broker.py
Delete schema-registry module, EventStore implementation, and Avro decoder/broker factory; broker instantiation simplified.
Idempotency middleware
backend/app/services/idempotency/middleware.py, backend/app/services/idempotency/__init__.py, related run_*.py files
Add IdempotencyMiddleware implementation and export; attach middleware to brokers at worker startup.
Repository & DB models
backend/app/db/repositories/event_repository.py, backend/app/db/docs/event.py, backend/app/db/repositories/admin/admin_events_repository.py
Move storage to EventRepository, adopt BaseEvent.model_validate(...), switch DB fields/indexes from event_typetopic, add idempotent duplicate handling.
Domain events & typing
backend/app/domain/events/typed.py, backend/app/domain/events/__init__.py, backend/app/domain/events/event_models.py
Refactor Avro-based models → Pydantic BaseEvent; remove DomainEvent/EventType enums in favor of class-derived topic() and add TopicCount. Update many public types to use BaseEvent and topic.
Handlers & message flows
backend/app/events/handlers.py, backend/app/dlq/manager.py, backend/app/events/handlers.py
Switch DLQ and handler processing to payload+headers and BaseEvent; remove event-store subscriber and Avro decode paths; adapt DLQ events and headers.
Services & workers
backend/app/services/... (execution, replay, kafka_event_service.py removed, k8s_worker, coordinator, result_processor, saga, pod_monitor, etc.)
Update constructors and usages to EventPublisher + EventRepository; remove KafkaEventService module; replace .produce() with .publish() and event_type usage with type(event).topic().
API, schemas & routes
backend/app/api/routes/*.py, backend/app/schemas_pydantic/*.py
Rename request/response fields from event_type/event_typestopic/topics; endpoints, models, and responses updated to BaseEvent and topic-based filters.
Tests & e2e
backend/tests/**, many files
Mass update tests to use BaseEvent, EventPublisher, topic strings and Pydantic validation; remove schema-registry/roundtrip/e2e tests tied to Avro/EventStore.
Infrastructure scripts & configs
backend/scripts/create_topics.py, backend/app/domain/enums/kafka.py, backend/config.test.toml
Dynamic topic discovery from BaseEvent.topic(), reduced KafkaTopic enum (DLQ only), and test config resource tweaks.
CI
.github/actions/e2e-ready/action.yml
Pre-pull test runtime image step added.

Sequence Diagram(s)

sequenceDiagram
  participant API as API / Service
  participant Publisher as EventPublisher
  participant Repo as EventRepository
  participant Broker as KafkaBroker
  participant Idem as IdempotencyMiddleware

  API->>Publisher: publish(event: BaseEvent, key?)
  Publisher->>Repo: store_event(BaseEvent) -> event_id
  Publisher->>Broker: publish(raw_event, headers) (uses topic = type(event).topic())
  Broker->>Idem: message passes through IdempotencyMiddleware
  Idem->>Subscriber: deliver message to handler(s)
  Subscriber->>Service: handler invokes business logic (Coordinator/K8s/ResultProcessor)
  Service->>Publisher: publish(new event)  -- loop repeats
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇
I bounced through models, light and spry,
Swapped Avro for Pydantic sky,
Events now hop by topic name,
Stored then sent — the publish game,
Middleware keeps duplicates shy.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 43.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix: removed most of schema encode/decode overhead' accurately describes the main architectural change: eliminating Avro serialization and schema registry overhead by switching to Pydantic-based JSON serialization with direct model validation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/schema-manager

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@backend/app/events/schema/schema_registry.py`:
- Around line 31-32: The payload is being produced with
event.model_dump(mode="python") which leaves datetimes/enums as Python objects
causing Avro encoding to fail; change the call in schema_registry.py that
assigns payload (the event.model_dump invocation used before calling
self.serializer.encode_record_with_schema) to use mode="json" (i.e.,
event.model_dump(mode="json", by_alias=False, exclude_unset=False)) so datetimes
become ISO strings and enums are serialized correctly before passing to
encode_record_with_schema.

In `@backend/app/services/event_replay/replay_service.py`:
- Around line 228-231: Remove the redundant if-check around
DomainEventAdapter.validate_python(doc) and wrap the call in a try/except to
catch ValidationError so a single malformed document doesn't abort the replay;
specifically, replace the current pattern that uses
DomainEventAdapter.validate_python followed by "if event:" with a try block that
calls DomainEventAdapter.validate_python(doc), appends the returned event to
batch and increments events_processed, and an except ValidationError that logs a
warning via self.logger.warning (include the error string in extra or message)
and increments session.failed_events to track skipped documents.
🧹 Nitpick comments (3)
backend/app/events/broker.py (1)

19-21: Consider adding error handling for decode failures.

The avro_decoder does not handle exceptions from decode_message (schema resolution failures, malformed payloads) or validate_python (unknown event types). If FastStream's broker doesn't catch these for DLQ routing, unhandled exceptions could crash consumers.

If exception propagation is intentional for upstream DLQ handling, this is fine as-is.

💡 Optional: Add error handling
     async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent:
-        payload = await schema_registry.serializer.decode_message(msg.body)
-        return DomainEventAdapter.validate_python(payload)
+        try:
+            payload = await schema_registry.serializer.decode_message(msg.body)
+            return DomainEventAdapter.validate_python(payload)
+        except Exception as e:
+            logger.error("Failed to decode message", extra={"error": str(e)})
+            raise
backend/app/db/repositories/event_repository.py (1)

400-401: Missing from_attributes=True parameter for aggregation results.

Line 401 validates events from doc["events"] (aggregation pipeline results) without from_attributes=True, while all other usages in this file include it. The $$ROOT in the pipeline pushes raw BSON documents which Beanie converts to dicts, so from_attributes=True may not be strictly required here. However, for consistency and to avoid potential issues if the pipeline behavior changes, consider adding the parameter.

♻️ Optional fix for consistency
-            events = [DomainEventAdapter.validate_python(e) for e in doc["events"]]
+            events = [DomainEventAdapter.validate_python(e, from_attributes=True) for e in doc["events"]]
backend/app/events/event_store.py (1)

115-115: Consider consistency with repository pattern.

The EventStore uses doc.model_dump() before validation, while EventRepository uses from_attributes=True. Both approaches work correctly:

  • doc.model_dump() converts to dict explicitly
  • from_attributes=True reads attributes directly from the object

The current approach is fine, but if you want consistency across the codebase, you could use from_attributes=True here as well. This is purely a style consideration.

♻️ Optional: Use from_attributes=True for consistency
-        event = DomainEventAdapter.validate_python(doc.model_dump())
+        event = DomainEventAdapter.validate_python(doc, from_attributes=True)

Apply similar change to lines 141, 158, 180, 201, 214, 238.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 15 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/event_replay/replay_service.py">

<violation number="1" location="backend/app/services/event_replay/replay_service.py:228">
P1: `DomainEventAdapter.validate_python` raises a `ValidationError` when validation fails, rather than returning `None`. This exception will cause the entire replay session to fail immediately in `_run_replay`, bypassing the `session.config.skip_errors` logic that handles errors in `_process_batch`.

If the database contains any events that don't match the current schema, the replay will crash.

Wrap this in a `try...except` block to handle validation errors (respecting `skip_errors` or logging warnings), and remove the redundant `if event:` check.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 16 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/schema/schema_registry.py">

<violation number="1" location="backend/app/events/schema/schema_registry.py:21">
P1: Schema registry authentication has been removed. If the registry requires auth credentials, this change will cause connection failures. Verify that authentication is no longer needed, or restore the `auth` parameter.</violation>

<violation number="2" location="backend/app/events/schema/schema_registry.py:28">
P1: The serialization logic changes may break Avro encoding. The previous code converted `timestamp` to microseconds (Avro long format) and removed `event_type` from the payload. The new `event.model_dump()` call doesn't perform these transformations, which could cause schema validation failures if the Avro schema expects timestamps as longs or doesn't include `event_type`.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 12 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/execution_service.py">

<violation number="1" location="backend/app/services/execution_service.py:306">
P1: The `event_types` parameter is accepted by the method but ignored in the `event_repository.get_execution_events` call. This creates a functional regression where filtering by event type is lost, and the method will return all events for the execution up to the limit, potentially including unwanted event types or exposing internal system events if the caller relied on filtering.

Since `EventRepository.get_execution_events` does not appear to support filtering by event type, either the repository needs to be updated or the filtering must be handled in-memory (though in-memory filtering combined with a database limit can cause pagination issues).</violation>
</file>

<file name="backend/app/events/core/producer.py">

<violation number="1" location="backend/app/events/core/producer.py:45">
P2: Persisting the event before serialization and topic lookup risks creating "orphaned" records in the database if those subsequent steps fail (e.g., due to schema validation errors or missing topic mappings). Consider performing validation and serialization before persistence to ensure only valid events are stored.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/execution_service.py (1)

289-307: ⚠️ Potential issue | 🟠 Major

event_types filter is ignored.

event_types is accepted but never applied, so callers can’t filter results anymore. Either push the filter into EventRepository.get_execution_events or filter in‑memory (noting the limit semantics).

🩹 Minimal in‑memory filter (short‑term)
-        result = await self.event_repository.get_execution_events(execution_id=execution_id, limit=limit)
-        events = result.events
+        result = await self.event_repository.get_execution_events(execution_id=execution_id, limit=limit)
+        events = result.events
+        if event_types:
+            events = [e for e in events if e.event_type in event_types]
🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 43-46: The produce method currently stores the event via
_event_repository.store_event and then publishes to Kafka (using
EVENT_TYPE_TO_TOPIC to build topic), which can leave DB events orphaned if
publish fails; change the flow to implement an outbox/publish-status pattern:
when creating the event in produce (or via _event_repository.store_event)
persist it with a publish_status field set to "pending"/"queued", then attempt
publishing to Kafka and upon success update that stored event's publish_status
to "published" (or "failed" on non-transient errors) using the event id returned
by store_event; additionally add a retry/reconciler background worker that scans
events with publish_status "pending"/"failed" and retries publishing, or adopt a
transactional outbox write+publish flow if supported—modify produce,
_event_repository.store_event (or the returned event object), and the Kafka
publish path that uses EVENT_TYPE_TO_TOPIC/topic accordingly to support status
updates and retries.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 73-101: The DLQ status header currently uses
str(DLQMessageStatus.PENDING) which yields "DLQMessageStatus.PENDING" and is
hard to parse; change the header to use the enum's stable value (e.g.,
DLQMessageStatus.PENDING.value) instead of str(...) in the headers dict built in
the failed-event sender (the block that builds headers for
inject_trace_context), ensuring the status is a plain string value the consumer
(handlers.py) can parse consistently.

In `@backend/app/events/handlers.py`:
- Around line 293-341: The DLQ header parsing in on_dlq_message is unsafe: wrap
parsing of headers["failed_at"] and DLQMessageStatus(...) in guarded logic so a
missing or malformed header doesn't crash the consumer; use
headers.get("failed_at") and if present attempt datetime.fromisoformat inside a
try/except (fallback to datetime.utcnow() or None) and similarly attempt to
construct DLQMessageStatus from headers.get("status") inside a try/except
(fallback to DLQMessageStatus.PENDING or a safe default), and emit a warning via
the provided logger when a parse fails; then build the DLQMessage with these
safe values before calling manager.handle_message so a single bad message won’t
stall on_dlq_message.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 3 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/dlq/manager.py">

<violation number="1" location="backend/app/dlq/manager.py:163">
P1: Switching to `schema_registry.serialize_event` introduces potential exceptions (schema validation, network errors) that were less likely with `json.dumps`. The caller `process_due_retries` (visible in file context) does not handle exceptions within its loop. A single message failing serialization will crash the retry cycle, preventing other messages from being processed (Poison Pill).

Consider adding error handling in `process_due_retries` to catch, log, and skip invalid messages.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/db/repositories/event_repository.py (1)

64-76: ⚠️ Potential issue | 🟠 Major

Batch insert should preserve idempotency on duplicates.

insert_many() will fail the entire batch on a duplicate key event, breaking the idempotent behavior you have in store_event(). Since event_id is unique, this causes operational issues when the same events are processed twice. Resolve by using unordered inserts with duplicate-key error handling to match the single-insert behavior:

Suggested fix
-from pymongo.errors import DuplicateKeyError
+from pymongo.errors import BulkWriteError, DuplicateKeyError

         ...
-        await EventDocument.insert_many(docs)
+        try:
+            await EventDocument.insert_many(docs, ordered=False)
+        except BulkWriteError as e:
+            dup_errors = [err for err in e.details.get("writeErrors", []) if err.get("code") == 11000]
+            if dup_errors:
+                self.logger.debug("Some events already stored, skipping duplicates")
+            else:
+                raise

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 3 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/k8s_worker/worker.py">

<violation number="1" location="backend/app/services/k8s_worker/worker.py:299">
P1: The `replace_namespaced_daemon_set` call uses PUT semantics and requires `metadata.resourceVersion` to be set in the body for optimistic locking. Without it, the API server will reject the request with a `409 Conflict`.

The code reads the DaemonSet but discards the result, failing to capture the `resourceVersion`.

**Fix**: Capture the existing DaemonSet and inject its `resourceVersion` into the manifest, or use `patch_namespaced_daemon_set` for easier idempotency.</violation>

<violation number="2" location="backend/app/services/k8s_worker/worker.py:313">
P1: The readiness check is prone to a race condition. It may mistakenly consider the DaemonSet "ready" based on the status of the *previous* generation before the controller has processed the update.

**Issues**:
1. **Race Condition**: `observed_generation` is not checked against `metadata.generation`.
2. **Unhandled Events**: The loop processes all event types (including `ERROR`, `DELETED`), which can cause `AttributeError` crashes when accessing `ds.status`.
3. **Infinite Blocking**: There is no timeout. If the cluster cannot fulfill the request (e.g., resource quotas, image pull errors), the application will hang indefinitely during startup.

**Fix**: Verify `observed_generation`, handle event types, and add a timeout or strict error handling.</violation>
</file>

<file name=".github/actions/e2e-ready/action.yml">

<violation number="1" location=".github/actions/e2e-ready/action.yml:76">
P2: Add an explicit timeout to the rollout status check so the workflow doesn’t hang indefinitely if the daemonset is stuck.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

await self.apps_v1.replace_namespaced_daemon_set(
name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type]
try:
await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The replace_namespaced_daemon_set call uses PUT semantics and requires metadata.resourceVersion to be set in the body for optimistic locking. Without it, the API server will reject the request with a 409 Conflict.

The code reads the DaemonSet but discards the result, failing to capture the resourceVersion.

Fix: Capture the existing DaemonSet and inject its resourceVersion into the manifest, or use patch_namespaced_daemon_set for easier idempotency.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/k8s_worker/worker.py, line 299:

<comment>The `replace_namespaced_daemon_set` call uses PUT semantics and requires `metadata.resourceVersion` to be set in the body for optimistic locking. Without it, the API server will reject the request with a `409 Conflict`.

The code reads the DaemonSet but discards the result, failing to capture the `resourceVersion`.

**Fix**: Capture the existing DaemonSet and inject its `resourceVersion` into the manifest, or use `patch_namespaced_daemon_set` for easier idempotency.</comment>

<file context>
@@ -251,63 +252,75 @@ async def wait_for_active_creations(self, timeout: float = 30.0) -> None:
-                await self.apps_v1.replace_namespaced_daemon_set(
-                    name=daemonset_name, namespace=namespace, body=manifest  # type: ignore[arg-type]
+        try:
+            await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
+            self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.")
+            await self.apps_v1.replace_namespaced_daemon_set(
</file context>
Fix with Cubic

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In @.github/actions/e2e-ready/action.yml:
- Around line 74-76: The "Wait for runtime images" step uses `kubectl -n
integr8scode rollout status daemonset/runtime-image-pre-puller` which can hang
indefinitely; add an explicit rollout timeout flag (e.g., `--timeout=5m`) to
bound the wait. Update the run line for the "Wait for runtime images" step to
call `kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller
--timeout=5m` so the CI job fails fast instead of hanging if the DaemonSet never
becomes ready.

In `@backend/app/services/k8s_worker/worker.py`:
- Around line 313-325: The watch loop using k8s_watch.Watch() (variable w) can
block forever and may leak if an exception/cancellation occurs; wrap the stream
iteration over self.apps_v1.list_namespaced_daemon_set (filtered by
daemonset_name) with a client-side timeout (e.g., asyncio.wait_for) or an
overall timeout mechanism and ensure w.close() is always called in a finally
block; inside the try block iterate the async for events and check desired/ready
as before, and in except/asyncio.CancelledError re-raise after cleanup and log
via self.logger to record timeout/errors.
- Around line 298-304: When updating an existing DaemonSet in the try/except
block, extract the existing object's metadata.resourceVersion from the result of
self.apps_v1.read_namespaced_daemon_set(...) and set that value into the
manifest's metadata.resourceVersion before calling
replace_namespaced_daemon_set; specifically, after reading the DaemonSet, pull
existing.metadata.resource_version (or existing.metadata.resourceVersion
depending on the client object) and assign it to
manifest['metadata']['resourceVersion'] (or manifest.metadata.resourceVersion)
so the replace_namespaced_daemon_set call includes the required resourceVersion.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @.github/actions/e2e-ready/action.yml:
- Around line 35-37: The "Pre-pull test runtime image into K3s" step currently
runs `sudo k3s ctr images pull docker.io/library/python:3.11-slim` with no
timeout; update that step to enforce a pull timeout (e.g., prefix the command
with the timeout utility like `timeout 5m`) so the step fails fast instead of
hanging, ensuring consistency with other steps that use explicit timeouts.

In `@backend/app/services/k8s_worker/worker.py`:
- Around line 268-270: The container_name generation can exceed Kubernetes'
63-character DNS-1123 label limit and you also have a redundant list() call;
update the loop that iterates sorted(list(all_images)) to use
sorted(all_images), normalize sanitized_image_ref to DNS-1123 rules (lowercase,
alphanumeric and hyphens, no leading/trailing hyphen), and truncate or replace
sanitized_image_ref so f"pull-{i}-{sanitized_image_ref}" is at most 63 chars —
preserve uniqueness (e.g., append or replace with a short hash/suffix of the
original) when truncating; ensure the final container_name variable meets
DNS-1123 constraints before use.

Comment on lines +268 to +270
for i, image_ref in enumerate(sorted(list(all_images))):
sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-")
container_name = f"pull-{i}-{sanitized_image_ref}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Container names may exceed Kubernetes 63-character limit.

Kubernetes container names must be valid DNS-1123 labels (max 63 characters). With the pattern pull-{i}-{sanitized_image_ref}, long image names could exceed this limit and cause API errors.

🛠️ Suggested fix: truncate container name
         for i, image_ref in enumerate(sorted(all_images)):
             sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-")
-            container_name = f"pull-{i}-{sanitized_image_ref}"
+            container_name = f"pull-{i}-{sanitized_image_ref}"[:63]
             init_containers.append(

Also, list() in sorted(list(all_images)) is redundant since sorted() already returns a list.

🤖 Prompt for AI Agents
In `@backend/app/services/k8s_worker/worker.py` around lines 268 - 270, The
container_name generation can exceed Kubernetes' 63-character DNS-1123 label
limit and you also have a redundant list() call; update the loop that iterates
sorted(list(all_images)) to use sorted(all_images), normalize
sanitized_image_ref to DNS-1123 rules (lowercase, alphanumeric and hyphens, no
leading/trailing hyphen), and truncate or replace sanitized_image_ref so
f"pull-{i}-{sanitized_image_ref}" is at most 63 chars — preserve uniqueness
(e.g., append or replace with a short hash/suffix of the original) when
truncating; ensure the final container_name variable meets DNS-1123 constraints
before use.

…any Avro/Schema Registry.

  The pipeline:

  Publishing (broker.publish(message=pydantic_model, topic=...)):
  1. FastStream's PydanticSerializer.encode() calls pydantic_core.to_json(model) → raw JSON bytes
  2. Adds content-type: application/json header
  3. Sends to Kafka via aiokafka

  Consuming (handler with body: SomePydanticModel):
  1. Raw bytes → json.loads() → Python dict
  2. fast_depends PydanticSerializer validates the dict against the handler's type annotation → typed Pydantic model instance
  3. Discriminated unions (Annotated[Union[...], Discriminator("event_type")]) work natively because Pydantic resolves the correct concrete type from the discriminator field

  Verified by tracing through the actual installed library code at .venv/lib/python3.12/site-packages/faststream/ and .venv/lib/python3.12/site-packages/fast_depends/.

  One thing that needs fixing: the DLQ handler's msg: StreamMessage[Any] parameter. Without the custom decoder, FastStream treats it as a body parameter (not context injection), causing the multi-param unpacking failure. The standard
  FastStream fix is msg: StreamMessage[Any] = Context("message") — this is not a custom workaround, it's the documented way to access the raw StreamMessage alongside a typed body parameter.

  What you lose by dropping Avro:
  - Schema Registry compatibility checks (schema evolution enforcement)
  - Compact binary encoding (Avro is ~2-3x smaller than JSON)

  What you gain:
  - No Schema Registry dependency
  - No pydantic-avro dependency
  - No custom decoder that breaks FastStream's native parameter resolution
  - DLQ handler works correctly
  - Simpler codebase — FastStream does all the work

  Avro → JSON migration (core changes):

  1. app/domain/events/typed.py — Replaced AvroBase (from pydantic-avro) with BaseModel on all 5 classes: ResourceUsageDomain, EventMetadata, BaseEvent, ContainerStatusInfo, ArchivedEvent
  2. app/events/broker.py — Removed the custom avro_decoder and SchemaRegistryManager param. Broker now uses FastStream's native JSON serialization
  3. app/events/core/producer.py — Removed SchemaRegistryManager dependency. produce() and send_to_dlq() now publish Pydantic models directly via broker.publish(message=event, ...)
  4. app/dlq/manager.py — Removed SchemaRegistryManager dependency. retry_message() and _produce_dlq_event() publish events directly
  5. app/events/handlers.py — Added Context("message") for DLQ handler's StreamMessage[Any] parameter (standard FastStream pattern for accessing raw message alongside typed body)
  6. app/core/providers.py — Deleted EventProvider class entirely. Removed schema_registry param from MessagingProvider, DLQProvider, DLQWorkerProvider
  7. app/core/container.py — Removed EventProvider from all 8 container factory functions
  8. app/main.py — Removed SchemaRegistryManager creation
  9. All 7 worker entry points — Removed SchemaRegistryManager import/creation, simplified create_broker() calls

  Deleted files:
  - app/events/schema/schema_registry.py — dead code
  - tests/e2e/events/test_schema_registry_roundtrip.py — tests deleted functionality
  - tests/e2e/events/test_schema_registry_real.py — tests deleted functionality

  Dependencies removed (from pyproject.toml):
  - pydantic-avro==0.9.1
  - python-schema-registry-client==2.6.1
  - avro-python3==1.10.2
  - fastavro==1.12.1
  - (plus 4 transitive deps: jsonschema, jsonschema-specifications, referencing, rpds-py)

  CI fix:
  - e2e-ready/action.yml — Changed sudo k3s ctr images pull → sudo k3s crictl pull (uses CRI interface, targets correct k8s.io containerd namespace)

  Config:
  - Added faststream.Context to ruff's extend-immutable-calls
 Good question. Let me look at industry practices:

  1 Topic = 1 Event Type (fine-grained)
  - Confluent's modern recommendation for event-driven microservices
  - Schema evolution is simpler (1 schema per topic)
  - No filtering needed - Kafka does routing
  - Better observability (metrics/alerts per event type)
  - Modern Kafka handles 10,000+ topics easily

  1 Topic = N Event Types (coarse-grained)
  - Traditional approach when Kafka topics were "expensive"
  - Preserves ordering across event types (important for event sourcing)
  - Requires filtering (header/body based)
  - Common in DDD: 1 topic per aggregate (all Order events together)

  The deciding question: Do you need ordering across event types?

  Example: If EXECUTION_REQUESTED → EXECUTION_STARTED → EXECUTION_COMPLETED must be processed in order for the same execution_id, they should share a topic (partitioned by execution_id).

  But looking at your architecture:
  - Coordinator handles: REQUESTED, COMPLETED, FAILED, CANCELLED
  - Result processor handles: COMPLETED, FAILED, TIMEOUT
  - Different services, different events - no cross-event ordering needed

  Modern consensus: Fine-grained topics (1:1) unless you specifically need cross-event ordering guarantees.

  Companies like Uber, Netflix, LinkedIn moved toward more granular topics as Kafka matured.

  Want me to refactor to 1 topic = 1 event type? It would:
  - Eliminate all filtering
  - Simplify handlers to pure type hints
  - Make EVENT_TYPE_TO_TOPIC trivial (topic = event_type.value)

   You're right. If 1 topic = 1 type:

  - Handler subscribes to topic execution_requested with type ExecutionRequested
  - Topic IS the type - no event_type field needed for routing
  - FastStream deserializes based on type hint

  The only question: when storing/retrieving from DB, how do we know the type?

  Answer: Store topic name as metadata, not as event field:

● Bash(uv run python << 'EOF'                                      timeout: 30s
      """Test: No event_type field at all - topic is the type"""…)
  ⎿  Serialized (no event_type): {"event_id":"","execution_id":"123","script":"print('hi')"}
     Topic: execution_requested
     … +25 lines (ctrl+o to expand)

● Final architecture - minimal:

  class BaseEvent(BaseModel):
      event_id: str = ""
      timestamp: datetime = ...
      metadata: EventMetadata

      @classmethod
      def topic(cls, prefix: str = "") -> str:
          return f"{prefix}{to_snake_case(cls.__name__)}"

  class ExecutionRequested(BaseEvent):
      execution_id: str
      script: str

  # Handler - topic IS the type
  @broker.subscriber(ExecutionRequested.topic(prefix), group_id="coordinator")
  async def handle(event: ExecutionRequested): ...

  # Producer - topic from class
  await broker.publish(event, topic=type(event).topic(prefix))

  # DB storage - topic as document metadata, not event field
  {"topic": "execution_requested", "data": {...}}

  Removed: event_type field, EventType enum, DomainEvent, registry, filters, discriminators

  Kept: BaseEvent, specific classes, topic() classmethod

man that's a mess
@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 5, 2026

Quality Gate Failed Quality Gate failed

Failed conditions
E Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

18 issues found across 102 files (changes from recent commits).

Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/idempotency/idempotency_manager.py">

<violation number="1" location="backend/app/services/idempotency/idempotency_manager.py:284">
P2: Inconsistent race condition handling: when `DuplicateKeyError` is caught, this doesn't check if the existing key has timed out (unlike `_create_new_key`), potentially blocking legitimate retries after timeout.</violation>
</file>

<file name="backend/app/api/routes/execution.py">

<violation number="1" location="backend/app/api/routes/execution.py:215">
P0: The `response_model=list[BaseEvent]` will cause data loss in the API response. `BaseEvent` does not include subclass-specific fields (like `script`, `exit_code`, `reason`) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.</violation>

<violation number="2" location="backend/app/api/routes/execution.py:219">
P1: Replacing `event_types` with `topics` is a breaking API change. Existing clients sending `event_types` will have their filter ignored (receiving all events instead of a subset), and they must now update to use snake_case topic names instead of the previous enum values. Consider supporting `event_types` for backward compatibility or documenting this break.</violation>
</file>

<file name="backend/app/db/docs/event.py">

<violation number="1" location="backend/app/db/docs/event.py:21">
P0: Renaming the `event_type` field to `topic` is a breaking schema change for the `events` collection.
1. Existing documents in the database likely contain `event_type` and lack `topic`.
2. Since `topic` is a required string field, Beanie will raise a `ValidationError` when reading any legacy document.
3. This also affects `EventArchiveDocument`.

Ensure a database migration script is run to rename the field in all documents before this code is deployed, or use `Field(validation_alias=AliasChoices('topic', 'event_type'))` to support both formats during transition.</violation>

<violation number="2" location="backend/app/db/docs/event.py:62">
P1: Updating the `idx_text_search` definition to use `topic` without changing the index name will cause a startup failure.
MongoDB does not allow recreating an existing index with the same name (`idx_text_search`) but different keys (`topic` vs `event_type`). The application will crash with an `IndexOptionsConflict` (or similar) upon initialization.

You must rename the index (e.g., to `idx_text_search_v2`) to allow Beanie to create a new index, or ensure the old index is dropped before deployment. Note that other indexes were correctly renamed (e.g., `idx_topic_ts`).</violation>
</file>

<file name="backend/app/db/repositories/admin/admin_events_repository.py">

<violation number="1" location="backend/app/db/repositories/admin/admin_events_repository.py:60">
P1: Replacing `DomainEventAdapter.validate_python` with `BaseEvent.model_validate` will likely result in data loss for event payloads. `BaseEvent` does not allow extra fields by default, so specific event fields (like `execution_id`, `script`, etc. stored in `EventDocument` extras) will be stripped during validation.

This will degrade the Admin UI "Browse Events" view by hiding event-specific details. Ensure `BaseEvent` allows extra fields or use a polymorphic adapter (like the removed `DomainEventAdapter`) to preserve full event data.</violation>

<violation number="2" location="backend/app/db/repositories/admin/admin_events_repository.py:69">
P1: Replacing `DomainEventAdapter.validate_python` with `BaseEvent.model_validate` causes data loss in the Event Detail view. Since `BaseEvent` strips extra fields, the returned `EventDetail` will lack the specific payload data (e.g., `execution_id`, `script`) that makes the detail view useful.

Consider using a polymorphic validator or updating `BaseEvent` configuration to allow extra fields.</violation>

<violation number="3" location="backend/app/db/repositories/admin/admin_events_repository.py:144">
P0: `get_event_stats` relies on `EventDocument.event_type`, which has been removed/replaced by `topic`. This will cause an `AttributeError` at runtime during the aggregation pipeline construction and execution.

The following queries in `get_event_stats` need to be updated to use `EventDocument.topic`:
1. `overview_pipeline`: `"event_types": {"$addToSet": S.field(EventDocument.event_type)}`
2. `error_count` query: `EventDocument.event_type: {"$regex": ...}`
3. `type_pipeline`: `.group(by=S.field(EventDocument.event_type), ...)`</violation>
</file>

<file name="backend/app/db/repositories/user_settings_repository.py">

<violation number="1" location="backend/app/db/repositories/user_settings_repository.py:33">
P2: Replacing the `EventType` enum with a list of raw strings (`topics`) removes type safety and central definitions for event topics.
The `EventDocument` documentation states that topics are derived from snake_case class names (e.g., `domain_user_settings_changed`), but existing callers (like `UserSettingsService`) appear to use legacy strings (e.g., `"user_settings_updated"`).
If the producer now uses the class-name-derived topic convention, queries using legacy strings will fail to find new events. Consider defining topic constants on the event models to ensure consistency between producers and consumers.</violation>
</file>

<file name="backend/app/schemas_pydantic/events.py">

<violation number="1" location="backend/app/schemas_pydantic/events.py:66">
P1: Using `list[BaseEvent]` causes data loss in the API response. `BaseEvent` is a Pydantic model serving as a base class; validating a list of subclass instances against it will strip all subclass-specific fields (like `execution_id`, `status`) and exclude the `topic` (which is a class method, not a field). The API will return only generic metadata.</violation>
</file>

<file name="backend/app/main.py">

<violation number="1" location="backend/app/main.py:63">
P2: The `KafkaBroker` in `main.py` is initialized without `IdempotencyMiddleware`, unlike the worker services (e.g., `run_coordinator.py`).

Since this broker serves `NotificationService` consumers (registered via `register_notification_subscriber`), omitting the middleware may lead to duplicate notifications during retries or rebalancing. The `create_app_container` also lacks `IdempotencyMiddlewareProvider`, which is present in worker containers.</violation>
</file>

<file name="backend/app/services/user_settings_service.py">

<violation number="1" location="backend/app/services/user_settings_service.py:56">
P0: Critical: The logic to persist 'user_settings_updated' events was removed from `update_user_settings` (via deletion of `_publish_settings_event` and `event_service`), but no replacement was added.

Since the system relies on Event Sourcing (rebuilding state from snapshot + events in `get_user_settings_fresh`), failing to persist the event means data loss. The update is currently only in-memory (cache) and will be lost on service restart unless a snapshot happens to be triggered.

You must inject a producer (e.g., `EventPublisher`) and persist the event.</violation>
</file>

<file name="backend/workers/run_event_replay.py">

<violation number="1" location="backend/workers/run_event_replay.py:16">
P1: The KafkaBroker instance is created but never started. In FastStream, the broker must be explicitly started (using `await broker.start()` or an async context manager) to initialize the producer connection. Without this, publishing events will likely fail or hang. Additionally, the broker should be gracefully closed on shutdown.</violation>
</file>

<file name="backend/app/dlq/models.py">

<violation number="1" location="backend/app/dlq/models.py:35">
P0: The usage of `BaseEvent` instead of a polymorphic union (like the removed `DomainEvent`) causes critical data loss. `BaseEvent` is configured with `extra='ignore'` (default), so when specific events (e.g., `ExecutionRequestedEvent`) are deserialized into `BaseEvent` or assigned to this field, all subclass-specific fields are stripped.

This means the DLQ will store incomplete events, and the retry mechanism will republish truncated events (missing critical fields like `execution_id`, `script`, etc.), causing downstream failures.

To fix this, either:
1. Update `BaseEvent` in `app/domain/events/typed.py` to allow extra fields (`model_config = ConfigDict(extra='allow')`).
2. Use a permissive subclass of `BaseEvent` in `DLQMessage`.
3. Use `dict` or `Any` for the `event` field (though this loses some type safety).</violation>
</file>

<file name="backend/app/services/idempotency/middleware.py">

<violation number="1" location="backend/app/services/idempotency/middleware.py:108">
P2: If Redis `delete` fails during cleanup, it will mask the original exception from `call_next`. Wrap the cleanup in a try-except to ensure the original error is preserved.</violation>
</file>

<file name="backend/scripts/create_topics.py">

<violation number="1" location="backend/scripts/create_topics.py:50">
P1: Topic configuration hardcodes `replication_factor` to 1, which creates single-point-of-failure topics unsuitable for production.

Consider deriving this value from `settings` or an environment variable (e.g., `settings.KAFKA_REPLICATION_FACTOR`) to support high availability in production environments.</violation>
</file>

<file name="backend/app/services/event_replay/replay_service.py">

<violation number="1" location="backend/app/services/event_replay/replay_service.py:230">
P0: Instantiating `BaseEvent` directly causes data loss and incorrect routing. `BaseEvent` is a base class and `model_validate` will not polymorphically instantiate the correct subclass (e.g., `ExecutionRequestedEvent`) based on the data.
1. Subclass-specific fields (e.g., `execution_id`, `script`) in `doc` will be discarded or fail validation.
2. `type(event).topic()` will resolve to the generic `base_event` topic instead of the specific event topic (e.g., `execution_requested`), causing events to be published to the wrong Kafka topic.

(Based on the removal of `DomainEventAdapter` which previously handled this.)</violation>
</file>

<file name="backend/workers/run_k8s_worker.py">

<violation number="1" location="backend/workers/run_k8s_worker.py:48">
P1: The `IdempotencyMiddleware` relies on `json.loads` to extract event IDs, which will fail or behave incorrectly if the message body is in Avro/Confluent wire format (binary). If the format is indeed Avro as per the PR description, this middleware will fail to compute keys, disabling idempotency for all messages.

Ensure the middleware is compatible with the wire format or is placed after a decoding middleware.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.



@router.get("/executions/{execution_id}/events", response_model=list[DomainEvent])
@router.get("/executions/{execution_id}/events", response_model=list[BaseEvent])
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0: The response_model=list[BaseEvent] will cause data loss in the API response. BaseEvent does not include subclass-specific fields (like script, exit_code, reason) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/api/routes/execution.py, line 215:

<comment>The `response_model=list[BaseEvent]` will cause data loss in the API response. `BaseEvent` does not include subclass-specific fields (like `script`, `exit_code`, `reason`) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.</comment>

<file context>
@@ -231,16 +212,16 @@ async def retry_execution(
 
 
-@router.get("/executions/{execution_id}/events", response_model=list[DomainEvent])
+@router.get("/executions/{execution_id}/events", response_model=list[BaseEvent])
 async def get_execution_events(
         execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
</file context>
Fix with Cubic


settings: DomainUserSettings
event_types = [EventType.USER_SETTINGS_UPDATED]
topics = ["user_settings_updated"]
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0: Critical: The logic to persist 'user_settings_updated' events was removed from update_user_settings (via deletion of _publish_settings_event and event_service), but no replacement was added.

Since the system relies on Event Sourcing (rebuilding state from snapshot + events in get_user_settings_fresh), failing to persist the event means data loss. The update is currently only in-memory (cache) and will be lost on service restart unless a snapshot happens to be triggered.

You must inject a producer (e.g., EventPublisher) and persist the event.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/user_settings_service.py, line 56:

<comment>Critical: The logic to persist 'user_settings_updated' events was removed from `update_user_settings` (via deletion of `_publish_settings_event` and `event_service`), but no replacement was added.

Since the system relies on Event Sourcing (rebuilding state from snapshot + events in `get_user_settings_fresh`), failing to persist the event means data loss. The update is currently only in-memory (cache) and will be lost on service restart unless a snapshot happens to be triggered.

You must inject a producer (e.g., `EventPublisher`) and persist the event.</comment>

<file context>
@@ -57,13 +53,13 @@ async def get_user_settings_fresh(self, user_id: str) -> DomainUserSettings:
 
         settings: DomainUserSettings
-        event_types = [EventType.USER_SETTINGS_UPDATED]
+        topics = ["user_settings_updated"]
         if snapshot:
             settings = snapshot
</file context>
Fix with Cubic

logger.info("KubernetesWorker infrastructure initialized")
await container.get(Database)
middleware = await container.get(IdempotencyMiddleware)
broker.add_middleware(middleware)
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The IdempotencyMiddleware relies on json.loads to extract event IDs, which will fail or behave incorrectly if the message body is in Avro/Confluent wire format (binary). If the format is indeed Avro as per the PR description, this middleware will fail to compute keys, disabling idempotency for all messages.

Ensure the middleware is compatible with the wire format or is placed after a decoding middleware.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_k8s_worker.py, line 48:

<comment>The `IdempotencyMiddleware` relies on `json.loads` to extract event IDs, which will fail or behave incorrectly if the message body is in Avro/Confluent wire format (binary). If the format is indeed Avro as per the PR description, this middleware will fail to compute keys, disabling idempotency for all messages.

Ensure the middleware is compatible with the wire format or is placed after a decoding middleware.</comment>

<file context>
@@ -33,20 +33,19 @@ def main() -> None:
-        await container.get(Database)  # triggers init_beanie inside provider
+        await container.get(Database)
+        middleware = await container.get(IdempotencyMiddleware)
+        broker.add_middleware(middleware)
         logger.info("KubernetesWorker ready")
 
</file context>
Fix with Cubic

await self._repo.insert_processing(record)
self.metrics.increment_idempotency_keys(self.config.key_prefix)
return True # Reserved successfully, proceed with processing
except DuplicateKeyError:
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Inconsistent race condition handling: when DuplicateKeyError is caught, this doesn't check if the existing key has timed out (unlike _create_new_key), potentially blocking legitimate retries after timeout.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/idempotency/idempotency_manager.py, line 284:

<comment>Inconsistent race condition handling: when `DuplicateKeyError` is caught, this doesn't check if the existing key has timed out (unlike `_create_new_key`), potentially blocking legitimate retries after timeout.</comment>

<file context>
@@ -240,3 +243,63 @@ async def get_cached_json(
+            await self._repo.insert_processing(record)
+            self.metrics.increment_idempotency_keys(self.config.key_prefix)
+            return True  # Reserved successfully, proceed with processing
+        except DuplicateKeyError:
+            # Race condition: another consumer reserved it first
+            self.metrics.record_idempotency_duplicate_blocked(event_type)
</file context>
Fix with Cubic

self,
user_id: str,
event_types: list[EventType],
topics: list[str],
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Replacing the EventType enum with a list of raw strings (topics) removes type safety and central definitions for event topics.
The EventDocument documentation states that topics are derived from snake_case class names (e.g., domain_user_settings_changed), but existing callers (like UserSettingsService) appear to use legacy strings (e.g., "user_settings_updated").
If the producer now uses the class-name-derived topic convention, queries using legacy strings will fail to find new events. Consider defining topic constants on the event models to ensure consistency between producers and consumers.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/user_settings_repository.py, line 33:

<comment>Replacing the `EventType` enum with a list of raw strings (`topics`) removes type safety and central definitions for event topics.
The `EventDocument` documentation states that topics are derived from snake_case class names (e.g., `domain_user_settings_changed`), but existing callers (like `UserSettingsService`) appear to use legacy strings (e.g., `"user_settings_updated"`).
If the producer now uses the class-name-derived topic convention, queries using legacy strings will fail to find new events. Consider defining topic constants on the event models to ensure consistency between producers and consumers.</comment>

<file context>
@@ -31,7 +30,7 @@ async def create_snapshot(self, settings: DomainUserSettings) -> None:
         self,
         user_id: str,
-        event_types: list[EventType],
+        topics: list[str],
         since: datetime | None = None,
         until: datetime | None = None,
</file context>
Fix with Cubic

schema_registry = SchemaRegistryManager(settings, logger)
broker = create_broker(settings, schema_registry, logger)
register_event_store_subscriber(broker, settings)
broker = KafkaBroker(settings.KAFKA_BOOTSTRAP_SERVERS, logger=logger)
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The KafkaBroker in main.py is initialized without IdempotencyMiddleware, unlike the worker services (e.g., run_coordinator.py).

Since this broker serves NotificationService consumers (registered via register_notification_subscriber), omitting the middleware may lead to duplicate notifications during retries or rebalancing. The create_app_container also lacks IdempotencyMiddlewareProvider, which is present in worker containers.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/main.py, line 63:

<comment>The `KafkaBroker` in `main.py` is initialized without `IdempotencyMiddleware`, unlike the worker services (e.g., `run_coordinator.py`).

Since this broker serves `NotificationService` consumers (registered via `register_notification_subscriber`), omitting the middleware may lead to duplicate notifications during retries or rebalancing. The `create_app_container` also lacks `IdempotencyMiddlewareProvider`, which is present in worker containers.</comment>

<file context>
@@ -61,8 +60,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
     # Create Kafka broker and register in-app subscribers
-    schema_registry = SchemaRegistryManager(settings, logger)
-    broker = create_broker(settings, schema_registry, logger)
+    broker = KafkaBroker(settings.KAFKA_BOOTSTRAP_SERVERS, logger=logger)
     register_sse_subscriber(broker, settings)
     register_notification_subscriber(broker, settings)
</file context>
Fix with Cubic

try:
return await call_next(msg)
except BaseException:
await self._redis.delete(key)
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: If Redis delete fails during cleanup, it will mask the original exception from call_next. Wrap the cleanup in a try-except to ensure the original error is preserved.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/idempotency/middleware.py, line 108:

<comment>If Redis `delete` fails during cleanup, it will mask the original exception from `call_next`. Wrap the cleanup in a try-except to ensure the original error is preserved.</comment>

<file context>
@@ -0,0 +1,109 @@
+        try:
+            return await call_next(msg)
+        except BaseException:
+            await self._redis.delete(key)
+            raise
</file context>
Fix with Cubic

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
backend/app/services/admin/admin_user_service.py (1)

76-92: ⚠️ Potential issue | 🟡 Minor

Use the existing EventType enum instead of hardcoded topic strings.

The EventType enum already exists in backend/app/domain/enums/events.py with the execution-related topics you need. Reference these enum values instead of duplicating the strings:

from app.domain.enums.events import EventType

execution_topics = [
    EventType.EXECUTION_REQUESTED,
    EventType.EXECUTION_STARTED,
    EventType.EXECUTION_COMPLETED,
    EventType.EXECUTION_FAILED,
    EventType.EXECUTION_TIMEOUT,
    EventType.EXECUTION_CANCELLED,
]

This avoids silent breakage if the enum values change and aligns with the pattern used elsewhere in the codebase.

backend/app/main.py (1)

6-63: ⚠️ Potential issue | 🟡 Minor

KafkaBroker instantiation does not apply consumer timeout settings; verify if this is intentional.

Settings defines KAFKA_SESSION_TIMEOUT_MS, KAFKA_HEARTBEAT_INTERVAL_MS, and KAFKA_REQUEST_TIMEOUT_MS, but these are not passed to KafkaBroker() in main.py. Similarly, KAFKA_CONSUMER_GROUP_ID and KAFKA_ENABLE_AUTO_COMMIT are unused—each subscriber decorator uses hardcoded group_id values instead. This pattern is consistent across all workers (saga orchestrator, result processor, pod monitor, k8s worker, coordinator).

Note: No SASL/SSL configuration exists for Kafka, and subscriber-level config (auto_offset_reset, max_poll_records, ack_policy) is set via decorator parameters, not the broker constructor. The timeout settings are only applied to the temporary consumer in consumer_group_monitor.py. Clarify whether these unused settings should be removed or if the KafkaBroker initialization should be updated to use them.

backend/app/api/routes/admin/events.py (1)

68-115: ⚠️ Potential issue | 🟠 Major

Breaking change: frontend still sends event_types parameter; backend now expects topics.

Frontend code actively sends the old event_types parameter (AdminEvents.svelte:191), but the backend endpoints accept only topics. This causes filtering to silently fail—requests won't error, filters just won't apply. Adding backward compatibility is essential for a smooth transition.

🛠️ Backward-compatible mapping
 async def export_events_csv(
     service: FromDishka[AdminEventsService],
     topics: list[str] | None = Query(None, description="Event topics (repeat param for multiple)"),
+    event_types: list[str] | None = Query(None, description="(deprecated) Event types", deprecated=True),
     start_time: datetime | None = Query(None, description="Start time"),
     end_time: datetime | None = Query(None, description="End time"),
     limit: int = Query(default=10000, le=50000),
 ) -> StreamingResponse:
     try:
+        topics = topics or event_types
         export_filter = EventFilter(
             topics=topics,
             start_time=start_time,
             end_time=end_time,
         )
 async def export_events_json(
     service: FromDishka[AdminEventsService],
     topics: list[str] | None = Query(None, description="Event topics (repeat param for multiple)"),
+    event_types: list[str] | None = Query(None, description="(deprecated) Event types", deprecated=True),
     aggregate_id: str | None = Query(None, description="Aggregate ID filter"),
     correlation_id: str | None = Query(None, description="Correlation ID filter"),
     user_id: str | None = Query(None, description="User ID filter"),
     service_name: str | None = Query(None, description="Service name filter"),
     start_time: datetime | None = Query(None, description="Start time"),
     end_time: datetime | None = Query(None, description="End time"),
     limit: int = Query(default=10000, le=50000),
 ) -> StreamingResponse:
     """Export events as JSON with comprehensive filtering."""
     try:
+        topics = topics or event_types
         export_filter = EventFilter(
             topics=topics,
             aggregate_id=aggregate_id,
             correlation_id=correlation_id,
             user_id=user_id,
             service_name=service_name,
             start_time=start_time,
             end_time=end_time,
         )
backend/app/services/idempotency/idempotency_manager.py (1)

48-67: ⚠️ Potential issue | 🟡 Minor

Namespace CONTENT_HASH keys by topic to avoid cross-topic collisions.

CONTENT_HASH keys currently hash only the payload, so different topics with identical content could block each other. Prefixing the hash with the topic keeps keys aligned with EVENT_BASED/CUSTOM and the new middleware.

Suggested change
-            key = hashlib.sha256(content.encode()).hexdigest()
+            key = f"{topic}:{hashlib.sha256(content.encode()).hexdigest()}"
backend/app/services/event_service.py (1)

126-132: ⚠️ Potential issue | 🟡 Minor

Map event_type sorting to topic to match the new schema.

If the stored field is now topic, sorting by event_type will be ineffective. Consider mapping both event_type and topic to the topic field.

Suggested change
-            "event_type": "event_type",
+            "event_type": "topic",
+            "topic": "topic",
backend/app/db/repositories/admin/admin_events_repository.py (2)

98-145: ⚠️ Potential issue | 🟡 Minor

Stats pipeline still groups by event_type.
With the topic-based migration, event_type is likely absent or stale, so stats/error_rate can be wrong. Update pipeline and error query to use EventDocument.topic.

🛠️ Suggested fix
-                    "event_types": {"$addToSet": S.field(EventDocument.event_type)},
+                    "event_types": {"$addToSet": S.field(EventDocument.topic)},
@@
-                EventDocument.event_type: {"$regex": "failed|error|timeout", "$options": "i"},
+                EventDocument.topic: {"$regex": "failed|error|timeout", "$options": "i"},
@@
-            .group(by=S.field(EventDocument.event_type), query={"count": S.sum(1)})
+            .group(by=S.field(EventDocument.topic), query={"count": S.sum(1)})

218-223: ⚠️ Potential issue | 🟠 Major

Archived events are missing topic.
event.model_dump() doesn’t include a topic field, so the archive document can be incomplete or fail validation. Populate it explicitly.

🛠️ Suggested fix
-        archive_doc = EventArchiveDocument(
-            **event.model_dump(),
+        archive_doc = EventArchiveDocument(
+            **event.model_dump(),
+            topic=type(event).topic(),
             deleted_at=datetime.now(timezone.utc),
             deleted_by=deleted_by,
         )
backend/app/api/routes/execution.py (1)

76-83: ⚠️ Potential issue | 🟠 Major

Do not instantiate BaseEvent directly as a pseudo-event; use a proper event subclass instead.

The IdempotencyManager.check_and_reserve() method calls type(event).topic() on the event (line 80 of idempotency_manager.py), which will return "base_event" for a direct BaseEvent instance. This is not a valid domain topic and will cause incorrect idempotency tracking with wrong topic values in metrics and records. Create a concrete event subclass for the pseudo-event or use an existing event class instead.

🤖 Fix all issues with AI agents
In `@backend/app/api/routes/events.py`:
- Around line 239-258: The PublishEventRequest.payload is never attached to the
BaseEvent, so the custom payload is dropped when creating event =
BaseEvent(...); update the creation to include the payload (from
event_request.payload) into the BaseEvent metadata or a dedicated payload field
so the data is preserved before calling producer.publish; alternatively change
BaseEvent to allow extra fields (extra="allow") like UserSettingsUpdatedEvent if
you prefer passthrough, but the quick fix is to pass event_request.payload into
the BaseEvent constructor (or into EventMetadata) so producer.publish receives
the payload.

In `@backend/app/db/docs/event.py`:
- Around line 17-22: The new required Model field topic on the Event doc
(declared alongside event_id and event_version) will break existing documents
that only have event_type; implement a migration/backfill that reads existing
documents with event_type, computes topic (snake_case class name without 'Event'
suffix or map via existing event_type), writes topic into each document, and
rebuilds the affected indexes (including archive indexes), or add a temporary
compatibility layer in the Event model that returns topic fallback = event_type
when topic is missing; update any query paths that rely on topic to tolerate the
fallback during rollout and mark the migration to run before removing the
fallback.

In `@backend/app/domain/events/typed.py`:
- Around line 1-22: The _to_snake_case function currently splits every capital
letter (turning "DLQMessageReceivedEvent" into "d_l_q_message_received"); update
it to preserve acronym blocks by first stripping the trailing "Event" then
applying a two-step regex split: 1) insert underscores between a
lowercase/number and a capital-started word (e.g., between "o" and "Request"),
and 2) insert underscores between a capital followed by lowercase sequence
boundary (so acronym sequences remain intact), then lower-case the result;
modify _to_snake_case accordingly so topics like DLQMessageReceivedEvent become
dlq_message_received while still handling normal CamelCase names.
- Around line 50-71: BaseEvent's Pydantic config currently omits unknown-event
fields causing them to be dropped on deserialization; update
BaseEvent.model_config to include extra="allow" (e.g., add extra="allow" into
the ConfigDict alongside from_attributes and json_schema_extra) so that
event-specific fields stored by EventDocument remain available when
instantiating BaseEvent (match the approach used by UserSettingsUpdatedEvent).

In `@backend/app/services/event_replay/replay_service.py`:
- Around line 229-240: BaseEvent.model_validate(doc) collapses the concrete type
into BaseEvent so replay publishes every event to the base_event topic; instead,
determine the concrete event class from the document (e.g., via a
type/discriminator field or an event-class registry) and call that concrete
class's model_validate/constructor before appending/publishing; update the
handlers around BaseEvent.model_validate(doc) (where batch.append(event) and
EventPublisher.publish(...) are used) to validate/rehydrate into the correct
subclass (or use a topic-driven factory) and keep session.failed_events++ on
validation errors.

In `@backend/tests/e2e/dlq/test_dlq_manager.py`:
- Around line 94-95: The assertion is using the wrong derived topic string for
DLQMessageReceivedEvent; because _to_snake_case() strips "Event" and converts
camelCase to snake_case, DLQMessageReceivedEvent becomes
"d_l_q_message_received", so update the assertion that checks received.topic()
(in the test referencing DLQMessageReceivedEvent/received) to expect
"d_l_q_message_received" instead of "dlq_message_received".

In `@backend/tests/e2e/events/test_producer_roundtrip.py`:
- Around line 25-27: The DLQ call uses an unprefixed topic (type(ev).topic())
while EventPublisher.publish() uses a prefixed topic via
type(event).topic(self._prefix), causing mismatch when a prefix is configured;
update the test to derive the DLQ original_topic the same way as publish by
passing the configured prefix (e.g., call type(ev).topic(prod._prefix) or obtain
the prefix from settings) so the original_topic passed to prod.send_to_dlq(...)
matches the actual published topic; reference EventPublisher.publish(),
send_to_dlq, and type(...).topic(...) when making the change.

In `@backend/tests/unit/services/pod_monitor/test_monitor.py`:
- Around line 370-374: The test's FailingProducer currently overrides produce so
PodMonitor's failure path isn't hit; change the override to implement async def
publish(self, event: BaseEvent, key: str | None = None) -> str in class
FailingProducer (which extends FakeEventPublisher) and raise
RuntimeError("Publish failed") there so PodMonitor.publish calls exercise the
error path; keep the same signature and return type to match
FakeEventPublisher/PodMonitor expectations.

In `@backend/workers/run_event_replay.py`:
- Around line 10-18: The KafkaBroker created in run_replay_service is never
started or stopped; update run_replay_service to explicitly manage the broker
lifecycle by calling await broker.start() before using
create_event_replay_container(...) and ensuring await broker.stop() runs in a
finally block (or convert the broker to an AsyncIterator provider like the
Redis/Database providers), so the context-injected KafkaBroker is started for
the service and reliably stopped when container.close() completes.
🧹 Nitpick comments (6)
backend/app/services/user_settings_service.py (1)

56-63: Centralize the settings topic to avoid drift.
The "user_settings_updated" literal is repeated across methods; if topic naming changes, queries/history will silently miss events. Consider deriving it once from the event class or a constant.

♻️ Suggested consolidation
+SETTINGS_UPDATED_TOPIC = DomainUserSettingsChangedEvent.topic()

-        topics = ["user_settings_updated"]
+        topics = [SETTINGS_UPDATED_TOPIC]
...
-        events = await self.repository.get_settings_events(user_id, ["user_settings_updated"], limit=limit)
+        events = await self.repository.get_settings_events(user_id, [SETTINGS_UPDATED_TOPIC], limit=limit)
...
-                        topic="user_settings_updated",
+                        topic=SETTINGS_UPDATED_TOPIC,
...
-        events = await self.repository.get_settings_events(user_id, ["user_settings_updated"], until=timestamp)
+        events = await self.repository.get_settings_events(user_id, [SETTINGS_UPDATED_TOPIC], until=timestamp)

Also applies to: 127-134, 146-146

backend/app/schemas_pydantic/admin_user_overview.py (1)

33-34: Use default_factory for list default.
If Pydantic doesn’t deep-copy mutable defaults per instance in 2.9.2, this can lead to shared state. Consider Field(default_factory=list) to be safe.

♻️ Proposed change
-from pydantic import BaseModel, ConfigDict
+from pydantic import BaseModel, ConfigDict, Field
@@
-    recent_events: list[BaseEvent] = []
+    recent_events: list[BaseEvent] = Field(default_factory=list)
backend/tests/e2e/services/execution/test_execution_service.py (1)

189-216: Consider applying the configured topic prefix in filters.
If KAFKA_TOPIC_PREFIX is non-empty in test settings, ExecutionRequestedEvent.topic() without a prefix may not match stored topics. Using the configured prefix would make the test more robust across environments.

backend/tests/e2e/test_admin_events_routes.py (1)

62-70: Avoid hardcoding topic strings in filters.
Optional, but using the event class topic keeps tests resilient to renames.

🔁 Optional refactor
-            filters=EventFilter(
-                topics=["execution_requested"],
-                aggregate_id=stored_event.aggregate_id,
-            ),
+            filters=EventFilter(
+                topics=[type(stored_event).topic()],
+                aggregate_id=stored_event.aggregate_id,
+            ),

Also applies to: 104-109

backend/app/schemas_pydantic/admin_events.py (1)

12-22: Consider consolidating duplicate EventFilter definitions.

There's an EventFilter class in both admin_events.py (for browsing) and event_models.py (for querying) with similar fields. If they serve the same purpose, consider consolidating them to reduce duplication and maintenance overhead.

#!/bin/bash
# Check both EventFilter definitions for potential consolidation
rg -n "class EventFilter" --type py -A 15
backend/app/db/repositories/event_repository.py (1)

64-77: Consider handling duplicate keys in batch inserts.

Unlike store_event, store_events_batch doesn't handle DuplicateKeyError. If any event in the batch has a duplicate event_id, the entire batch insert may fail (depending on MongoDB's behavior for insert_many with ordered=True which is the default).

Consider using ordered=False to allow partial success, or catching BulkWriteError to extract successfully inserted event IDs.

♻️ Proposed fix for idempotent batch inserts
+from pymongo.errors import BulkWriteError, DuplicateKeyError
+
     async def store_events_batch(self, events: list[BaseEvent]) -> list[str]:
         if not events:
             return []
         now = datetime.now(timezone.utc)
         docs = []
         for event in events:
             data = event.model_dump(exclude_none=True)
             data["topic"] = type(event).topic()
             data.setdefault("stored_at", now)
             docs.append(EventDocument(**data))
-        await EventDocument.insert_many(docs)
+        try:
+            await EventDocument.insert_many(docs, ordered=False)
+        except BulkWriteError as e:
+            # Some inserts may have succeeded; log and continue
+            self.logger.debug(f"Batch insert completed with {e.details.get('nInserted', 0)} inserts, some duplicates skipped")
         add_span_attributes(**{"events.batch.count": len(events)})
         self.logger.info(f"Stored {len(events)} events in batch")
         return [event.event_id for event in events]

Comment on lines +239 to +258
"""Publish a custom event (admin only). Creates a BaseEvent with the provided payload."""
base_meta = EventMetadata(
service_name=settings.SERVICE_NAME,
service_version=settings.SERVICE_VERSION,
user_id=admin.user_id,
ip_address=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
correlation_id=event_request.correlation_id or CorrelationContext.get_correlation_id(),
)
# Merge any additional metadata provided in request (extra allowed)
if event_request.metadata:
base_meta = base_meta.model_copy(update=event_request.metadata)

event_id = await event_service.publish_event(
event_type=event_request.event_type,
payload=event_request.payload,
# Create a BaseEvent with the custom payload in metadata
event = BaseEvent(
aggregate_id=event_request.aggregate_id,
correlation_id=event_request.correlation_id,
metadata=base_meta,
)

event_id = await producer.publish(event=event, key=event_request.aggregate_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find and inspect PublishEventRequest class definition
rg -n -A15 'class PublishEventRequest' -S

Repository: HardMax71/Integr8sCode

Length of output: 1452


🏁 Script executed:

#!/bin/bash
# Find and inspect BaseEvent class definition
rg -n -A15 'class BaseEvent' -S

Repository: HardMax71/Integr8sCode

Length of output: 2592


🏁 Script executed:

#!/bin/bash
# Check the BaseEvent model configuration for extra fields handling
rg -n -B5 -A20 'class BaseEvent' backend/app/domain/events/typed.py -S

Repository: HardMax71/Integr8sCode

Length of output: 1029


🏁 Script executed:

#!/bin/bash
# Read the exact function code around lines 239-258 in events.py
sed -n '230,260p' backend/app/api/routes/events.py

Repository: HardMax71/Integr8sCode

Length of output: 1370


🏁 Script executed:

#!/bin/bash
# Check if BaseEvent has any extra="allow" or similar configuration
rg -n 'extra' backend/app/domain/events/typed.py -A2 -B2

Repository: HardMax71/Integr8sCode

Length of output: 374


Custom event payload is dropped and will not be published.

PublishEventRequest includes a required payload field, but it is never passed to the BaseEvent constructor. Since BaseEvent doesn't have extra="allow" configured and lacks a payload field, the custom payload data will be silently discarded due to Pydantic's default behavior of ignoring unknown fields.

💡 Proposed fix (preserve payload explicitly)
-    # Create a BaseEvent with the custom payload in metadata
-    event = BaseEvent(
-        aggregate_id=event_request.aggregate_id,
-        metadata=base_meta,
-    )
+    payload = event_request.payload or {}
+    event = BaseEvent.model_validate(
+        {**payload, "aggregate_id": event_request.aggregate_id, "metadata": base_meta}
+    )

Alternatively, enable extra field passthrough on BaseEvent with extra="allow" (as done in UserSettingsUpdatedEvent).

🤖 Prompt for AI Agents
In `@backend/app/api/routes/events.py` around lines 239 - 258, The
PublishEventRequest.payload is never attached to the BaseEvent, so the custom
payload is dropped when creating event = BaseEvent(...); update the creation to
include the payload (from event_request.payload) into the BaseEvent metadata or
a dedicated payload field so the data is preserved before calling
producer.publish; alternatively change BaseEvent to allow extra fields
(extra="allow") like UserSettingsUpdatedEvent if you prefer passthrough, but the
quick fix is to pass event_request.payload into the BaseEvent constructor (or
into EventMetadata) so producer.publish receives the payload.

Comment on lines +17 to 22
Topic field stores the event type name (e.g., 'execution_requested').
"""

event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
event_type: EventType # Indexed via Settings.indexes
topic: str # Event topic name (snake_case class name without 'Event' suffix)
event_version: str = "1.0"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Plan a backfill/migration for the new required topic field.
Existing events that only have event_type will fail validation or be invisible to the new topic-based queries/indexes (including archives). Please ensure a migration/backfill (and index rebuild) is in place before rollout, or add a temporary compatibility path.

Also applies to: 39-63, 81-83, 101-101

🤖 Prompt for AI Agents
In `@backend/app/db/docs/event.py` around lines 17 - 22, The new required Model
field topic on the Event doc (declared alongside event_id and event_version)
will break existing documents that only have event_type; implement a
migration/backfill that reads existing documents with event_type, computes topic
(snake_case class name without 'Event' suffix or map via existing event_type),
writes topic into each document, and rebuilds the affected indexes (including
archive indexes), or add a temporary compatibility layer in the Event model that
returns topic fallback = event_type when topic is missing; update any query
paths that rely on topic to tolerate the fallback during rollout and mark the
migration to run before removing the fallback.

Comment on lines +1 to +22
import re
from datetime import datetime, timezone
from typing import Annotated, Literal
from uuid import uuid4

from pydantic import ConfigDict, Discriminator, Field, TypeAdapter
from pydantic_avro.to_avro.base import AvroBase
from pydantic import BaseModel, ConfigDict, Field

from app.domain.enums.auth import LoginMethod
from app.domain.enums.common import Environment
from app.domain.enums.events import EventType
from app.domain.enums.execution import QueuePriority
from app.domain.enums.notification import NotificationChannel, NotificationSeverity
from app.domain.enums.storage import ExecutionErrorType, StorageType


class ResourceUsageDomain(AvroBase):
def _to_snake_case(name: str) -> str:
"""Convert class name to snake_case topic name.

ExecutionRequestedEvent -> execution_requested
PodCreatedEvent -> pod_created
"""
if name.endswith("Event"):
name = name[:-5]
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Acronym splitting changes topic names (e.g., DLQd_l_q).
The current regex inserts underscores between every capital, so DLQMessageReceivedEvent becomes d_l_q_message_received. If consumers/topics expect dlq_message_received, routing breaks. Consider a conversion that preserves acronym blocks.

🛠️ Suggested snake_case conversion
 def _to_snake_case(name: str) -> str:
     """Convert class name to snake_case topic name.
@@
     if name.endswith("Event"):
         name = name[:-5]
-    return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
+    name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
+    name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
+    return name.lower()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import re
from datetime import datetime, timezone
from typing import Annotated, Literal
from uuid import uuid4
from pydantic import ConfigDict, Discriminator, Field, TypeAdapter
from pydantic_avro.to_avro.base import AvroBase
from pydantic import BaseModel, ConfigDict, Field
from app.domain.enums.auth import LoginMethod
from app.domain.enums.common import Environment
from app.domain.enums.events import EventType
from app.domain.enums.execution import QueuePriority
from app.domain.enums.notification import NotificationChannel, NotificationSeverity
from app.domain.enums.storage import ExecutionErrorType, StorageType
class ResourceUsageDomain(AvroBase):
def _to_snake_case(name: str) -> str:
"""Convert class name to snake_case topic name.
ExecutionRequestedEvent -> execution_requested
PodCreatedEvent -> pod_created
"""
if name.endswith("Event"):
name = name[:-5]
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
import re
from datetime import datetime, timezone
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, Field
from app.domain.enums.auth import LoginMethod
from app.domain.enums.common import Environment
from app.domain.enums.execution import QueuePriority
from app.domain.enums.notification import NotificationChannel, NotificationSeverity
from app.domain.enums.storage import ExecutionErrorType, StorageType
def _to_snake_case(name: str) -> str:
"""Convert class name to snake_case topic name.
ExecutionRequestedEvent -> execution_requested
PodCreatedEvent -> pod_created
"""
if name.endswith("Event"):
name = name[:-5]
name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
return name.lower()
🤖 Prompt for AI Agents
In `@backend/app/domain/events/typed.py` around lines 1 - 22, The _to_snake_case
function currently splits every capital letter (turning
"DLQMessageReceivedEvent" into "d_l_q_message_received"); update it to preserve
acronym blocks by first stripping the trailing "Event" then applying a two-step
regex split: 1) insert underscores between a lowercase/number and a
capital-started word (e.g., between "o" and "Request"), and 2) insert
underscores between a capital followed by lowercase sequence boundary (so
acronym sequences remain intact), then lower-case the result; modify
_to_snake_case accordingly so topics like DLQMessageReceivedEvent become
dlq_message_received while still handling normal CamelCase names.

Comment on lines +50 to +71
class BaseEvent(BaseModel):
"""Base fields for all domain events.

Topic routing: Each event class maps to its own Kafka topic.
Topic name is derived from class name: ExecutionRequestedEvent -> execution_requested
"""

# Pydantic marks fields with default/default_factory as optional in JSON Schema,
# which generates optional TypeScript types (e.g., `event_id?: string`).
# Since stored events always have these fields, we override the schema to mark them required.
# See: https://github.com/pydantic/pydantic/issues/7209
# See: https://github.com/pydantic/pydantic/discussions/6073
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={"required": ["event_id", "event_type", "event_version", "timestamp", "metadata"]},
json_schema_extra={"required": ["event_id", "event_version", "timestamp", "metadata"]},
)

event_id: str = Field(default_factory=lambda: str(uuid4()))
event_type: EventType
event_version: str = "1.0"
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
aggregate_id: str | None = None
metadata: EventMetadata

@classmethod
def topic(cls, prefix: str = "") -> str:
"""Get Kafka topic name for this event class."""
return f"{prefix}{_to_snake_case(cls.__name__)}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "*.py" | head -20

Repository: HardMax71/Integr8sCode

Length of output: 801


🏁 Script executed:

rg -t py "BaseEvent" --max-count 30 -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 50378


🏁 Script executed:

rg -t py "browse|replay" --max-count 20 -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 50378


🏁 Script executed:

fd -t f -e py "repo" | head -20

Repository: HardMax71/Integr8sCode

Length of output: 1182


🏁 Script executed:

rg -t py "model_validate.*BaseEvent" -A 3 -B 3 | head -100

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

cat backend/app/domain/events/typed.py | head -100

Repository: HardMax71/Integr8sCode

Length of output: 2948


🏁 Script executed:

rg -t py "class EventDocument" -A 20

Repository: HardMax71/Integr8sCode

Length of output: 1800


🏁 Script executed:

rg -t py "extra=.allow" backend/app/domain/events/ -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 347


🏁 Script executed:

rg -t py "discriminator|union|Union\[.*Event" backend/app/domain/events/ -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg -t py "get_event|model_validate" backend/app/db/repositories/event_repository.py -B 3 -A 5 | head -150

Repository: HardMax71/Integr8sCode

Length of output: 5479


🏁 Script executed:

cat backend/app/db/repositories/event_repository.py | head -200

Repository: HardMax71/Integr8sCode

Length of output: 7961


🏁 Script executed:

rg -t py "\.execution_id|\.script|\.pod_name" backend/app/api/routes/admin/events.py -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

cat backend/app/api/routes/admin/events.py | head -150

Repository: HardMax71/Integr8sCode

Length of output: 5351


🏁 Script executed:

rg -t py "type\(.*event\)|isinstance.*Event" backend/app/services/ -B 2 -A 2 | head -80

Repository: HardMax71/Integr8sCode

Length of output: 4384


🏁 Script executed:

rg -t py "event\.execution_id|event\.script|event\.pod_name|event\." backend/app/services/admin/admin_events_service.py -B 2 -A 2 | head -80

Repository: HardMax71/Integr8sCode

Length of output: 474


🏁 Script executed:

cat backend/app/services/event_replay/replay_service.py | grep -A 20 "BaseEvent.model_validate"

Repository: HardMax71/Integr8sCode

Length of output: 846


🏁 Script executed:

rg -t py "test.*BaseEvent|def test" backend/tests/e2e/test_events_routes.py | head -20

Repository: HardMax71/Integr8sCode

Length of output: 1060


🏁 Script executed:

cat backend/tests/e2e/test_events_routes.py | grep -A 30 "def test_replay_events_dry_run"

Repository: HardMax71/Integr8sCode

Length of output: 1237


🏁 Script executed:

rg -t py "model_dump|export.*json|export.*csv" backend/app/services/admin/admin_events_service.py -B 3 -A 3

Repository: HardMax71/Integr8sCode

Length of output: 2345


🏁 Script executed:

cat backend/app/db/repositories/admin/admin_events_repository.py | grep -A 30 "async def browse_events"

Repository: HardMax71/Integr8sCode

Length of output: 1352


BaseEvent drops event-specific fields during deserialization without extra="allow".

Events are stored in MongoDB with event-specific fields preserved (EventDocument has extra="allow"), but when retrieved and validated as BaseEvent, Pydantic drops any unknown fields because BaseEvent lacks this configuration. This causes silent data loss in browse/replay responses and any code that processes deserialized events. EventDocument stores fields like execution_id, pod_name, script, etc., but they become inaccessible once the model is instantiated as BaseEvent.

Add extra="allow" to BaseEvent's model_config to preserve these fields through the deserialization pipeline, matching the pattern already established by UserSettingsUpdatedEvent:

Suggested fix
 model_config = ConfigDict(
     from_attributes=True,
+    extra="allow",
     json_schema_extra={"required": ["event_id", "event_version", "timestamp", "metadata"]},
 )
🤖 Prompt for AI Agents
In `@backend/app/domain/events/typed.py` around lines 50 - 71, BaseEvent's
Pydantic config currently omits unknown-event fields causing them to be dropped
on deserialization; update BaseEvent.model_config to include extra="allow"
(e.g., add extra="allow" into the ConfigDict alongside from_attributes and
json_schema_extra) so that event-specific fields stored by EventDocument remain
available when instantiating BaseEvent (match the approach used by
UserSettingsUpdatedEvent).

Comment on lines +229 to +240
try:
event = BaseEvent.model_validate(doc)
except ValidationError as e:
session.failed_events += 1
self.logger.warning(
"Skipping event that failed validation",
extra={"event_id": doc.get("event_id", "unknown"), "error": str(e)},
)
continue

batch.append(event)
events_processed += 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

BaseEvent validation collapses concrete types; replay will publish to the wrong topic.
BaseEvent.model_validate(doc) yields BaseEvent instances, so EventPublisher.publish() will route everything to base_event. Replay needs concrete event subclasses (or a topic-driven factory) to preserve topic routing and metrics.

🔧 Suggested fix (rehydrate concrete events)
-from app.domain.events.typed import BaseEvent
+from app.domain.events.typed import BaseEvent, DomainEventAdapter
...
-                try:
-                    event = BaseEvent.model_validate(doc)
+                try:
+                    event = DomainEventAdapter.validate_python(doc)

Also applies to: 286-287, 301-301

🤖 Prompt for AI Agents
In `@backend/app/services/event_replay/replay_service.py` around lines 229 - 240,
BaseEvent.model_validate(doc) collapses the concrete type into BaseEvent so
replay publishes every event to the base_event topic; instead, determine the
concrete event class from the document (e.g., via a type/discriminator field or
an event-class registry) and call that concrete class's
model_validate/constructor before appending/publishing; update the handlers
around BaseEvent.model_validate(doc) (where batch.append(event) and
EventPublisher.publish(...) are used) to validate/rehydrate into the correct
subclass (or use a topic-driven factory) and keep session.failed_events++ on
validation errors.

Comment on lines +94 to +95
assert type(received).topic() == "dlq_message_received"
assert "execution" in received.original_topic.lower()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find the DLQMessageReceivedEvent class definition and check the topic derivation
rg -n "class DLQ.*Event" --type py -A 5 -B 1

Repository: HardMax71/Integr8sCode

Length of output: 1424


🏁 Script executed:

# Also check for the topic() method implementation to understand how topics are derived
rg -n "def topic\(" --type py -A 3

Repository: HardMax71/Integr8sCode

Length of output: 758


🏁 Script executed:

# Search for where DLQMessageReceivedEvent is defined/imported in the test file
rg -n "DLQMessageReceivedEvent" backend/tests/e2e/dlq/test_dlq_manager.py -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 1494


🏁 Script executed:

# Find the _to_snake_case implementation
rg -n "_to_snake_case" --type py -B 2 -A 8

Repository: HardMax71/Integr8sCode

Length of output: 1474


🏁 Script executed:

# Verify the actual expected topic by checking other usages of DLQMessageReceivedEvent or similar events
rg -n "DLQMessageReceivedEvent.*topic\|ExecutionRequestedEvent.*topic" --type py -B 1 -A 1

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Also check if there's an actual Kafka topic defined with this name
rg -n "dlq_message_received|d_l_q_message_received|execution_requested" --type py -C 1 | head -50

Repository: HardMax71/Integr8sCode

Length of output: 3845


Update the assertion to use the correct derived topic string.

The _to_snake_case() function strips the "Event" suffix and converts camelCase to snake_case. For DLQMessageReceivedEvent, this produces d_l_q_message_received, not dlq_message_received or dlq_message_received_event.

Update line 94:

🔧 Correct fix
-        assert type(received).topic() == "dlq_message_received"
+        assert type(received).topic() == "d_l_q_message_received"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert type(received).topic() == "dlq_message_received"
assert "execution" in received.original_topic.lower()
assert type(received).topic() == "d_l_q_message_received"
assert "execution" in received.original_topic.lower()
🤖 Prompt for AI Agents
In `@backend/tests/e2e/dlq/test_dlq_manager.py` around lines 94 - 95, The
assertion is using the wrong derived topic string for DLQMessageReceivedEvent;
because _to_snake_case() strips "Event" and converts camelCase to snake_case,
DLQMessageReceivedEvent becomes "d_l_q_message_received", so update the
assertion that checks received.topic() (in the test referencing
DLQMessageReceivedEvent/received) to expect "d_l_q_message_received" instead of
"dlq_message_received".

Comment on lines +25 to 27
# Topic is derived from event class via BaseEvent.topic()
topic = type(ev).topic()
await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Topic prefix inconsistency between publish and send_to_dlq.

EventPublisher.publish() internally derives the topic using the prefix: type(event).topic(self._prefix). However, send_to_dlq is called with an unprefixed topic from type(ev).topic(). This means original_topic in the DLQ will not match the actual published topic if a prefix is configured.

Consider using the same prefix when deriving the topic for DLQ:

Suggested fix
     # Exercise send_to_dlq path — should not raise
     # Topic is derived from event class via BaseEvent.topic()
-    topic = type(ev).topic()
+    # Note: In production, EventPublisher uses settings.KAFKA_TOPIC_PREFIX
+    # For this test, we use unprefixed topic since we're just exercising the code path
+    topic = type(ev).topic()  # Unprefixed for test simplicity
     await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1)

Alternatively, retrieve the prefix from settings to match production behavior.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Topic is derived from event class via BaseEvent.topic()
topic = type(ev).topic()
await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1)
# Exercise send_to_dlq path — should not raise
# Topic is derived from event class via BaseEvent.topic()
# Note: In production, EventPublisher uses settings.KAFKA_TOPIC_PREFIX
# For this test, we use unprefixed topic since we're just exercising the code path
topic = type(ev).topic() # Unprefixed for test simplicity
await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1)
🤖 Prompt for AI Agents
In `@backend/tests/e2e/events/test_producer_roundtrip.py` around lines 25 - 27,
The DLQ call uses an unprefixed topic (type(ev).topic()) while
EventPublisher.publish() uses a prefixed topic via
type(event).topic(self._prefix), causing mismatch when a prefix is configured;
update the test to derive the DLQ original_topic the same way as publish by
passing the configured prefix (e.g., call type(ev).topic(prod._prefix) or obtain
the prefix from settings) so the original_topic passed to prod.send_to_dlq(...)
matches the actual published topic; reference EventPublisher.publish(),
send_to_dlq, and type(...).topic(...) when making the change.

Comment on lines +370 to 374
class FailingProducer(FakeEventPublisher):
async def produce(
self, event_to_produce: DomainEvent, key: str | None = None, headers: dict[str, str] | None = None
) -> None:
self, event: BaseEvent, key: str | None = None
) -> str:
raise RuntimeError("Publish failed")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

FailingProducer overrides the old method, so the failure path isn’t exercised.

PodMonitor calls publish, but the test override uses produce.

Suggested fix
-        async def produce(
+        async def publish(
                 self, event: BaseEvent, key: str | None = None
         ) -> str:
             raise RuntimeError("Publish failed")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class FailingProducer(FakeEventPublisher):
async def produce(
self, event_to_produce: DomainEvent, key: str | None = None, headers: dict[str, str] | None = None
) -> None:
self, event: BaseEvent, key: str | None = None
) -> str:
raise RuntimeError("Publish failed")
class FailingProducer(FakeEventPublisher):
async def publish(
self, event: BaseEvent, key: str | None = None
) -> str:
raise RuntimeError("Publish failed")
🤖 Prompt for AI Agents
In `@backend/tests/unit/services/pod_monitor/test_monitor.py` around lines 370 -
374, The test's FailingProducer currently overrides produce so PodMonitor's
failure path isn't hit; change the override to implement async def publish(self,
event: BaseEvent, key: str | None = None) -> str in class FailingProducer (which
extends FakeEventPublisher) and raise RuntimeError("Publish failed") there so
PodMonitor.publish calls exercise the error path; keep the same signature and
return type to match FakeEventPublisher/PodMonitor expectations.

Comment on lines +10 to 18
from faststream.kafka import KafkaBroker


async def run_replay_service(settings: Settings) -> None:
"""Run the event replay service with DI-managed cleanup scheduler."""
tmp_logger = setup_logger(settings.LOG_LEVEL)
schema_registry = SchemaRegistryManager(settings, tmp_logger)
broker = create_broker(settings, schema_registry, tmp_logger)
broker = KafkaBroker(settings.KAFKA_BOOTSTRAP_SERVERS, logger=tmp_logger)

container = create_event_replay_container(settings, broker)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect event replay container wiring and broker lifecycle
echo "=== Searching for create_event_replay_container ==="
rg -n "create_event_replay_container" backend/app/core/container.py -A 20

echo -e "\n=== Searching for EventReplay references ==="
rg -n "EventReplay" backend/app/core/container.py -C 3

echo -e "\n=== Searching for KafkaBroker/broker handling ==="
rg -n "KafkaBroker|broker" backend/app/core/container.py -C 3

echo -e "\n=== Check if container.py exists and its size ==="
ls -lh backend/app/core/container.py 2>/dev/null || echo "File not found at that path"

echo -e "\n=== Search more broadly for container definitions ==="
find . -name "container.py" -type f 2>/dev/null | head -5

Repository: HardMax71/Integr8sCode

Length of output: 6104


🏁 Script executed:

#!/bin/bash
# Inspect the full run_event_replay.py file
echo "=== Full run_event_replay.py file ==="
cat -n backend/workers/run_event_replay.py

echo -e "\n=== Check EventReplayWorkerProvider for lifecycle management ==="
rg -n "EventReplayWorkerProvider" backend/app/core/providers.py -A 30

Repository: HardMax71/Integr8sCode

Length of output: 3779


🏁 Script executed:

#!/bin/bash
# Check if broker is closed anywhere in event replay worker
echo "=== Check for broker.close() in run_event_replay.py ==="
rg -n "broker\.|close()" backend/workers/run_event_replay.py

# Check how other workers handle broker cleanup
echo -e "\n=== Check other workers for broker cleanup patterns ==="
find backend/workers -name "*.py" -type f | head -5

for file in $(find backend/workers -name "run_*.py" -type f | head -3); do
  echo -e "\n=== File: $file ==="
  grep -n "broker\|close()" "$file" | head -10
done

# Check if dishka container.close() manages context objects
echo -e "\n=== Look for MessagingProvider to see how brokers are typically managed ==="
rg -n "MessagingProvider|KafkaBroker" backend/app/core/providers.py -A 15 | head -40

Repository: HardMax71/Integr8sCode

Length of output: 3022


🏁 Script executed:

#!/bin/bash
# Check FastStream documentation and how it manages broker lifecycle
echo "=== Check if broker needs explicit close in event replay context ==="
rg -n "FastStream" backend/workers/run_event_replay.py

echo -e "\n=== Compare all worker shutdown patterns ==="
for file in backend/workers/run_*.py; do
  echo -e "\n--- File: $file ---"
  grep -n "finally\|await.*close\|FastStream" "$file"
done

# Check if dishka's container.close() handles context-injected objects
echo -e "\n=== Search for dishka/asynccontainer usage patterns ==="
rg -n "await container.close()" backend/workers/ -B 5 | head -40

Repository: HardMax71/Integr8sCode

Length of output: 3627


🏁 Script executed:

#!/bin/bash
# Check if dishka's from_context manages lifecycle
echo "=== Check dishka documentation patterns for context objects ==="
rg -n "from_context\|AsyncIterator" backend/app/core/providers.py | head -20

# Verify if broker.close() is a valid method on KafkaBroker
echo -e "\n=== Check if there are any broker.close() calls elsewhere ==="
rg -n "broker\.close\|broker\.stop" backend/ -g "*.py"

# Look at how other services manage KafkaBroker cleanup
echo -e "\n=== Check if FastStream manages broker cleanup ==="
rg -n "class FastStream|def.*shutdown" backend/workers/ -A 3 | head -30

Repository: HardMax71/Integr8sCode

Length of output: 1947


🏁 Script executed:

#!/bin/bash
# Check if setup_dishka is called in event replay worker
echo "=== Check setup_dishka usage in event replay ==="
rg -n "setup_dishka" backend/workers/run_event_replay.py

# Check what setup_dishka does
echo -e "\n=== Find setup_dishka definition ==="
rg -n "def setup_dishka|setup_dishka" backend/app/ -A 5

# Check if there's explicit broker cleanup in any provider
echo -e "\n=== Check for AsyncIterator providers that manage broker ==="
rg -n "AsyncIterator\[KafkaBroker\]" backend/app/core/providers.py -B 3 -A 10

# Look at how MessagingProvider receives broker
echo -e "\n=== Full MessagingProvider implementation ==="
rg -n "class MessagingProvider" backend/app/core/providers.py -A 25

Repository: HardMax71/Integr8sCode

Length of output: 2209


🏁 Script executed:

#!/bin/bash
# Check FastStream/KafkaBroker documentation for lifecycle requirements
echo "=== Check if KafkaBroker has close method ==="
find . -name "*.py" -type f -exec grep -l "KafkaBroker" {} \; | head -5 | xargs grep -n "\.close\|\.stop" | grep -i kafka

# Check the actual setup_dishka_faststream to see if it manages broker
echo -e "\n=== Check if setup_dishka_faststream is used in event replay ==="
rg -n "setup_dishka" backend/workers/run_event_replay.py -B 5 -A 5

# Compare main app setup vs event replay setup
echo -e "\n=== Check main app setup pattern ==="
sed -n '70,90p' backend/app/main.py

# Check if dishka container.close() handles context objects
echo -e "\n=== Check if there's any cleanup code after container creation in event replay ==="
sed -n '13,40p' backend/workers/run_event_replay.py

Repository: HardMax71/Integr8sCode

Length of output: 2495


🏁 Script executed:

#!/bin/bash
# Check dishka's handling of context objects and whether container.close() manages them
echo "=== Check if dishka container closes context-injected objects ==="
rg -n "context=" backend/app/core/container.py | head -10

# Look for any explicit broker cleanup elsewhere
echo -e "\n=== Check if broker cleanup happens in EventReplayService or related classes ==="
rg -n "broker\.|KafkaBroker" backend/app/services/event_replay/ -g "*.py" 

# Check the difference: setup_dishka_faststream usage
echo -e "\n=== See main.py full setup ==="
sed -n '1,90p' backend/app/main.py | grep -n "setup_dishka\|broker\|lifespan"

# Check if from_context creates AsyncIterator context managers
echo -e "\n=== Check if MessagingProvider or any other provider manages broker lifecycle ==="
rg -n "class.*Provider.*:" backend/app/core/providers.py | head -20

Repository: HardMax71/Integr8sCode

Length of output: 2382


🏁 Script executed:

#!/bin/bash
# Check the lifespan function to see how broker is managed in main app
echo "=== Check dishka_lifespan.py ==="
cat backend/app/core/dishka_lifespan.py

# Verify that dishka's from_context doesn't manage lifecycle
echo -e "\n=== Check dishka documentation patterns in codebase for managed objects ==="
rg -n "AsyncIterator" backend/app/core/providers.py -B 2 -A 5 | head -30

Repository: HardMax71/Integr8sCode

Length of output: 4274


Add explicit Kafka broker lifecycle management.

The broker is created and passed to the container via from_context, but it is never started or stopped. Unlike the main application (which calls await broker.start() and await broker.stop() in the lifespan handler), the event replay worker lacks these essential lifecycle calls. The container.close() call only manages objects created by providers, not context-injected objects.

Wrap broker initialization in an AsyncIterator provider or add explicit calls:

await broker.start()
# ... service runs ...
await broker.stop()

Alternatively, refactor to use a managed provider pattern similar to Redis and Database clients in the providers module.

🤖 Prompt for AI Agents
In `@backend/workers/run_event_replay.py` around lines 10 - 18, The KafkaBroker
created in run_replay_service is never started or stopped; update
run_replay_service to explicitly manage the broker lifecycle by calling await
broker.start() before using create_event_replay_container(...) and ensuring
await broker.stop() runs in a finally block (or convert the broker to an
AsyncIterator provider like the Redis/Database providers), so the
context-injected KafkaBroker is started for the service and reliably stopped
when container.close() completes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant