fix: removed most of schema encode/decode overhead#135
fix: removed most of schema encode/decode overhead#135
Conversation
📝 WalkthroughWalkthroughReplaces Avro/SchemaRegistry + UnifiedProducer + EventStore with a model-driven flow: BaseEvent + EventPublisher + EventRepository; removes schema registry and event_store modules; adds IdempotencyMiddleware; converts event-type enums to topic strings; updates DI providers, handlers, services, and tests to use topics and BaseEvent model validation (model_validate). Changes
Sequence Diagram(s)sequenceDiagram
participant API as API / Service
participant Publisher as EventPublisher
participant Repo as EventRepository
participant Broker as KafkaBroker
participant Idem as IdempotencyMiddleware
API->>Publisher: publish(event: BaseEvent, key?)
Publisher->>Repo: store_event(BaseEvent) -> event_id
Publisher->>Broker: publish(raw_event, headers) (uses topic = type(event).topic())
Broker->>Idem: message passes through IdempotencyMiddleware
Idem->>Subscriber: deliver message to handler(s)
Subscriber->>Service: handler invokes business logic (Coordinator/K8s/ResultProcessor)
Service->>Publisher: publish(new event) -- loop repeats
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/events/schema/schema_registry.py`:
- Around line 31-32: The payload is being produced with
event.model_dump(mode="python") which leaves datetimes/enums as Python objects
causing Avro encoding to fail; change the call in schema_registry.py that
assigns payload (the event.model_dump invocation used before calling
self.serializer.encode_record_with_schema) to use mode="json" (i.e.,
event.model_dump(mode="json", by_alias=False, exclude_unset=False)) so datetimes
become ISO strings and enums are serialized correctly before passing to
encode_record_with_schema.
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 228-231: Remove the redundant if-check around
DomainEventAdapter.validate_python(doc) and wrap the call in a try/except to
catch ValidationError so a single malformed document doesn't abort the replay;
specifically, replace the current pattern that uses
DomainEventAdapter.validate_python followed by "if event:" with a try block that
calls DomainEventAdapter.validate_python(doc), appends the returned event to
batch and increments events_processed, and an except ValidationError that logs a
warning via self.logger.warning (include the error string in extra or message)
and increments session.failed_events to track skipped documents.
🧹 Nitpick comments (3)
backend/app/events/broker.py (1)
19-21: Consider adding error handling for decode failures.The
avro_decoderdoes not handle exceptions fromdecode_message(schema resolution failures, malformed payloads) orvalidate_python(unknown event types). If FastStream's broker doesn't catch these for DLQ routing, unhandled exceptions could crash consumers.If exception propagation is intentional for upstream DLQ handling, this is fine as-is.
💡 Optional: Add error handling
async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent: - payload = await schema_registry.serializer.decode_message(msg.body) - return DomainEventAdapter.validate_python(payload) + try: + payload = await schema_registry.serializer.decode_message(msg.body) + return DomainEventAdapter.validate_python(payload) + except Exception as e: + logger.error("Failed to decode message", extra={"error": str(e)}) + raisebackend/app/db/repositories/event_repository.py (1)
400-401: Missingfrom_attributes=Trueparameter for aggregation results.Line 401 validates events from
doc["events"](aggregation pipeline results) withoutfrom_attributes=True, while all other usages in this file include it. The$$ROOTin the pipeline pushes raw BSON documents which Beanie converts to dicts, sofrom_attributes=Truemay not be strictly required here. However, for consistency and to avoid potential issues if the pipeline behavior changes, consider adding the parameter.♻️ Optional fix for consistency
- events = [DomainEventAdapter.validate_python(e) for e in doc["events"]] + events = [DomainEventAdapter.validate_python(e, from_attributes=True) for e in doc["events"]]backend/app/events/event_store.py (1)
115-115: Consider consistency with repository pattern.The
EventStoreusesdoc.model_dump()before validation, whileEventRepositoryusesfrom_attributes=True. Both approaches work correctly:
doc.model_dump()converts to dict explicitlyfrom_attributes=Truereads attributes directly from the objectThe current approach is fine, but if you want consistency across the codebase, you could use
from_attributes=Truehere as well. This is purely a style consideration.♻️ Optional: Use from_attributes=True for consistency
- event = DomainEventAdapter.validate_python(doc.model_dump()) + event = DomainEventAdapter.validate_python(doc, from_attributes=True)Apply similar change to lines 141, 158, 180, 201, 214, 238.
There was a problem hiding this comment.
1 issue found across 15 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/event_replay/replay_service.py">
<violation number="1" location="backend/app/services/event_replay/replay_service.py:228">
P1: `DomainEventAdapter.validate_python` raises a `ValidationError` when validation fails, rather than returning `None`. This exception will cause the entire replay session to fail immediately in `_run_replay`, bypassing the `session.config.skip_errors` logic that handles errors in `_process_batch`.
If the database contains any events that don't match the current schema, the replay will crash.
Wrap this in a `try...except` block to handle validation errors (respecting `skip_errors` or logging warnings), and remove the redundant `if event:` check.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
2 issues found across 16 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/events/schema/schema_registry.py">
<violation number="1" location="backend/app/events/schema/schema_registry.py:21">
P1: Schema registry authentication has been removed. If the registry requires auth credentials, this change will cause connection failures. Verify that authentication is no longer needed, or restore the `auth` parameter.</violation>
<violation number="2" location="backend/app/events/schema/schema_registry.py:28">
P1: The serialization logic changes may break Avro encoding. The previous code converted `timestamp` to microseconds (Avro long format) and removed `event_type` from the payload. The new `event.model_dump()` call doesn't perform these transformations, which could cause schema validation failures if the Avro schema expects timestamps as longs or doesn't include `event_type`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
2 issues found across 12 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/execution_service.py">
<violation number="1" location="backend/app/services/execution_service.py:306">
P1: The `event_types` parameter is accepted by the method but ignored in the `event_repository.get_execution_events` call. This creates a functional regression where filtering by event type is lost, and the method will return all events for the execution up to the limit, potentially including unwanted event types or exposing internal system events if the caller relied on filtering.
Since `EventRepository.get_execution_events` does not appear to support filtering by event type, either the repository needs to be updated or the filtering must be handled in-memory (though in-memory filtering combined with a database limit can cause pagination issues).</violation>
</file>
<file name="backend/app/events/core/producer.py">
<violation number="1" location="backend/app/events/core/producer.py:45">
P2: Persisting the event before serialization and topic lookup risks creating "orphaned" records in the database if those subsequent steps fail (e.g., due to schema validation errors or missing topic mappings). Consider performing validation and serialization before persistence to ensure only valid events are stored.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/execution_service.py (1)
289-307:⚠️ Potential issue | 🟠 Major
event_typesfilter is ignored.
event_typesis accepted but never applied, so callers can’t filter results anymore. Either push the filter intoEventRepository.get_execution_eventsor filter in‑memory (noting thelimitsemantics).🩹 Minimal in‑memory filter (short‑term)
- result = await self.event_repository.get_execution_events(execution_id=execution_id, limit=limit) - events = result.events + result = await self.event_repository.get_execution_events(execution_id=execution_id, limit=limit) + events = result.events + if event_types: + events = [e for e in events if e.event_type in event_types]
🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 43-46: The produce method currently stores the event via
_event_repository.store_event and then publishes to Kafka (using
EVENT_TYPE_TO_TOPIC to build topic), which can leave DB events orphaned if
publish fails; change the flow to implement an outbox/publish-status pattern:
when creating the event in produce (or via _event_repository.store_event)
persist it with a publish_status field set to "pending"/"queued", then attempt
publishing to Kafka and upon success update that stored event's publish_status
to "published" (or "failed" on non-transient errors) using the event id returned
by store_event; additionally add a retry/reconciler background worker that scans
events with publish_status "pending"/"failed" and retries publishing, or adopt a
transactional outbox write+publish flow if supported—modify produce,
_event_repository.store_event (or the returned event object), and the Kafka
publish path that uses EVENT_TYPE_TO_TOPIC/topic accordingly to support status
updates and retries.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 73-101: The DLQ status header currently uses
str(DLQMessageStatus.PENDING) which yields "DLQMessageStatus.PENDING" and is
hard to parse; change the header to use the enum's stable value (e.g.,
DLQMessageStatus.PENDING.value) instead of str(...) in the headers dict built in
the failed-event sender (the block that builds headers for
inject_trace_context), ensuring the status is a plain string value the consumer
(handlers.py) can parse consistently.
In `@backend/app/events/handlers.py`:
- Around line 293-341: The DLQ header parsing in on_dlq_message is unsafe: wrap
parsing of headers["failed_at"] and DLQMessageStatus(...) in guarded logic so a
missing or malformed header doesn't crash the consumer; use
headers.get("failed_at") and if present attempt datetime.fromisoformat inside a
try/except (fallback to datetime.utcnow() or None) and similarly attempt to
construct DLQMessageStatus from headers.get("status") inside a try/except
(fallback to DLQMessageStatus.PENDING or a safe default), and emit a warning via
the provided logger when a parse fails; then build the DLQMessage with these
safe values before calling manager.handle_message so a single bad message won’t
stall on_dlq_message.
There was a problem hiding this comment.
1 issue found across 3 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/dlq/manager.py">
<violation number="1" location="backend/app/dlq/manager.py:163">
P1: Switching to `schema_registry.serialize_event` introduces potential exceptions (schema validation, network errors) that were less likely with `json.dumps`. The caller `process_due_retries` (visible in file context) does not handle exceptions within its loop. A single message failing serialization will crash the retry cycle, preventing other messages from being processed (Poison Pill).
Consider adding error handling in `process_due_retries` to catch, log, and skip invalid messages.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/db/repositories/event_repository.py (1)
64-76:⚠️ Potential issue | 🟠 MajorBatch insert should preserve idempotency on duplicates.
insert_many()will fail the entire batch on a duplicate key event, breaking the idempotent behavior you have instore_event(). Sinceevent_idis unique, this causes operational issues when the same events are processed twice. Resolve by using unordered inserts with duplicate-key error handling to match the single-insert behavior:Suggested fix
-from pymongo.errors import DuplicateKeyError +from pymongo.errors import BulkWriteError, DuplicateKeyError ... - await EventDocument.insert_many(docs) + try: + await EventDocument.insert_many(docs, ordered=False) + except BulkWriteError as e: + dup_errors = [err for err in e.details.get("writeErrors", []) if err.get("code") == 11000] + if dup_errors: + self.logger.debug("Some events already stored, skipping duplicates") + else: + raise
There was a problem hiding this comment.
3 issues found across 3 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/k8s_worker/worker.py">
<violation number="1" location="backend/app/services/k8s_worker/worker.py:299">
P1: The `replace_namespaced_daemon_set` call uses PUT semantics and requires `metadata.resourceVersion` to be set in the body for optimistic locking. Without it, the API server will reject the request with a `409 Conflict`.
The code reads the DaemonSet but discards the result, failing to capture the `resourceVersion`.
**Fix**: Capture the existing DaemonSet and inject its `resourceVersion` into the manifest, or use `patch_namespaced_daemon_set` for easier idempotency.</violation>
<violation number="2" location="backend/app/services/k8s_worker/worker.py:313">
P1: The readiness check is prone to a race condition. It may mistakenly consider the DaemonSet "ready" based on the status of the *previous* generation before the controller has processed the update.
**Issues**:
1. **Race Condition**: `observed_generation` is not checked against `metadata.generation`.
2. **Unhandled Events**: The loop processes all event types (including `ERROR`, `DELETED`), which can cause `AttributeError` crashes when accessing `ds.status`.
3. **Infinite Blocking**: There is no timeout. If the cluster cannot fulfill the request (e.g., resource quotas, image pull errors), the application will hang indefinitely during startup.
**Fix**: Verify `observed_generation`, handle event types, and add a timeout or strict error handling.</violation>
</file>
<file name=".github/actions/e2e-ready/action.yml">
<violation number="1" location=".github/actions/e2e-ready/action.yml:76">
P2: Add an explicit timeout to the rollout status check so the workflow doesn’t hang indefinitely if the daemonset is stuck.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| await self.apps_v1.replace_namespaced_daemon_set( | ||
| name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type] | ||
| try: | ||
| await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace) |
There was a problem hiding this comment.
P1: The replace_namespaced_daemon_set call uses PUT semantics and requires metadata.resourceVersion to be set in the body for optimistic locking. Without it, the API server will reject the request with a 409 Conflict.
The code reads the DaemonSet but discards the result, failing to capture the resourceVersion.
Fix: Capture the existing DaemonSet and inject its resourceVersion into the manifest, or use patch_namespaced_daemon_set for easier idempotency.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/k8s_worker/worker.py, line 299:
<comment>The `replace_namespaced_daemon_set` call uses PUT semantics and requires `metadata.resourceVersion` to be set in the body for optimistic locking. Without it, the API server will reject the request with a `409 Conflict`.
The code reads the DaemonSet but discards the result, failing to capture the `resourceVersion`.
**Fix**: Capture the existing DaemonSet and inject its `resourceVersion` into the manifest, or use `patch_namespaced_daemon_set` for easier idempotency.</comment>
<file context>
@@ -251,63 +252,75 @@ async def wait_for_active_creations(self, timeout: float = 30.0) -> None:
- await self.apps_v1.replace_namespaced_daemon_set(
- name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type]
+ try:
+ await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
+ self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.")
+ await self.apps_v1.replace_namespaced_daemon_set(
</file context>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @.github/actions/e2e-ready/action.yml:
- Around line 74-76: The "Wait for runtime images" step uses `kubectl -n
integr8scode rollout status daemonset/runtime-image-pre-puller` which can hang
indefinitely; add an explicit rollout timeout flag (e.g., `--timeout=5m`) to
bound the wait. Update the run line for the "Wait for runtime images" step to
call `kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller
--timeout=5m` so the CI job fails fast instead of hanging if the DaemonSet never
becomes ready.
In `@backend/app/services/k8s_worker/worker.py`:
- Around line 313-325: The watch loop using k8s_watch.Watch() (variable w) can
block forever and may leak if an exception/cancellation occurs; wrap the stream
iteration over self.apps_v1.list_namespaced_daemon_set (filtered by
daemonset_name) with a client-side timeout (e.g., asyncio.wait_for) or an
overall timeout mechanism and ensure w.close() is always called in a finally
block; inside the try block iterate the async for events and check desired/ready
as before, and in except/asyncio.CancelledError re-raise after cleanup and log
via self.logger to record timeout/errors.
- Around line 298-304: When updating an existing DaemonSet in the try/except
block, extract the existing object's metadata.resourceVersion from the result of
self.apps_v1.read_namespaced_daemon_set(...) and set that value into the
manifest's metadata.resourceVersion before calling
replace_namespaced_daemon_set; specifically, after reading the DaemonSet, pull
existing.metadata.resource_version (or existing.metadata.resourceVersion
depending on the client object) and assign it to
manifest['metadata']['resourceVersion'] (or manifest.metadata.resourceVersion)
so the replace_namespaced_daemon_set call includes the required resourceVersion.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @.github/actions/e2e-ready/action.yml:
- Around line 35-37: The "Pre-pull test runtime image into K3s" step currently
runs `sudo k3s ctr images pull docker.io/library/python:3.11-slim` with no
timeout; update that step to enforce a pull timeout (e.g., prefix the command
with the timeout utility like `timeout 5m`) so the step fails fast instead of
hanging, ensuring consistency with other steps that use explicit timeouts.
In `@backend/app/services/k8s_worker/worker.py`:
- Around line 268-270: The container_name generation can exceed Kubernetes'
63-character DNS-1123 label limit and you also have a redundant list() call;
update the loop that iterates sorted(list(all_images)) to use
sorted(all_images), normalize sanitized_image_ref to DNS-1123 rules (lowercase,
alphanumeric and hyphens, no leading/trailing hyphen), and truncate or replace
sanitized_image_ref so f"pull-{i}-{sanitized_image_ref}" is at most 63 chars —
preserve uniqueness (e.g., append or replace with a short hash/suffix of the
original) when truncating; ensure the final container_name variable meets
DNS-1123 constraints before use.
| for i, image_ref in enumerate(sorted(list(all_images))): | ||
| sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-") | ||
| container_name = f"pull-{i}-{sanitized_image_ref}" |
There was a problem hiding this comment.
Container names may exceed Kubernetes 63-character limit.
Kubernetes container names must be valid DNS-1123 labels (max 63 characters). With the pattern pull-{i}-{sanitized_image_ref}, long image names could exceed this limit and cause API errors.
🛠️ Suggested fix: truncate container name
for i, image_ref in enumerate(sorted(all_images)):
sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-")
- container_name = f"pull-{i}-{sanitized_image_ref}"
+ container_name = f"pull-{i}-{sanitized_image_ref}"[:63]
init_containers.append(Also, list() in sorted(list(all_images)) is redundant since sorted() already returns a list.
🤖 Prompt for AI Agents
In `@backend/app/services/k8s_worker/worker.py` around lines 268 - 270, The
container_name generation can exceed Kubernetes' 63-character DNS-1123 label
limit and you also have a redundant list() call; update the loop that iterates
sorted(list(all_images)) to use sorted(all_images), normalize
sanitized_image_ref to DNS-1123 rules (lowercase, alphanumeric and hyphens, no
leading/trailing hyphen), and truncate or replace sanitized_image_ref so
f"pull-{i}-{sanitized_image_ref}" is at most 63 chars — preserve uniqueness
(e.g., append or replace with a short hash/suffix of the original) when
truncating; ensure the final container_name variable meets DNS-1123 constraints
before use.
…any Avro/Schema Registry.
The pipeline:
Publishing (broker.publish(message=pydantic_model, topic=...)):
1. FastStream's PydanticSerializer.encode() calls pydantic_core.to_json(model) → raw JSON bytes
2. Adds content-type: application/json header
3. Sends to Kafka via aiokafka
Consuming (handler with body: SomePydanticModel):
1. Raw bytes → json.loads() → Python dict
2. fast_depends PydanticSerializer validates the dict against the handler's type annotation → typed Pydantic model instance
3. Discriminated unions (Annotated[Union[...], Discriminator("event_type")]) work natively because Pydantic resolves the correct concrete type from the discriminator field
Verified by tracing through the actual installed library code at .venv/lib/python3.12/site-packages/faststream/ and .venv/lib/python3.12/site-packages/fast_depends/.
One thing that needs fixing: the DLQ handler's msg: StreamMessage[Any] parameter. Without the custom decoder, FastStream treats it as a body parameter (not context injection), causing the multi-param unpacking failure. The standard
FastStream fix is msg: StreamMessage[Any] = Context("message") — this is not a custom workaround, it's the documented way to access the raw StreamMessage alongside a typed body parameter.
What you lose by dropping Avro:
- Schema Registry compatibility checks (schema evolution enforcement)
- Compact binary encoding (Avro is ~2-3x smaller than JSON)
What you gain:
- No Schema Registry dependency
- No pydantic-avro dependency
- No custom decoder that breaks FastStream's native parameter resolution
- DLQ handler works correctly
- Simpler codebase — FastStream does all the work
Avro → JSON migration (core changes):
1. app/domain/events/typed.py — Replaced AvroBase (from pydantic-avro) with BaseModel on all 5 classes: ResourceUsageDomain, EventMetadata, BaseEvent, ContainerStatusInfo, ArchivedEvent
2. app/events/broker.py — Removed the custom avro_decoder and SchemaRegistryManager param. Broker now uses FastStream's native JSON serialization
3. app/events/core/producer.py — Removed SchemaRegistryManager dependency. produce() and send_to_dlq() now publish Pydantic models directly via broker.publish(message=event, ...)
4. app/dlq/manager.py — Removed SchemaRegistryManager dependency. retry_message() and _produce_dlq_event() publish events directly
5. app/events/handlers.py — Added Context("message") for DLQ handler's StreamMessage[Any] parameter (standard FastStream pattern for accessing raw message alongside typed body)
6. app/core/providers.py — Deleted EventProvider class entirely. Removed schema_registry param from MessagingProvider, DLQProvider, DLQWorkerProvider
7. app/core/container.py — Removed EventProvider from all 8 container factory functions
8. app/main.py — Removed SchemaRegistryManager creation
9. All 7 worker entry points — Removed SchemaRegistryManager import/creation, simplified create_broker() calls
Deleted files:
- app/events/schema/schema_registry.py — dead code
- tests/e2e/events/test_schema_registry_roundtrip.py — tests deleted functionality
- tests/e2e/events/test_schema_registry_real.py — tests deleted functionality
Dependencies removed (from pyproject.toml):
- pydantic-avro==0.9.1
- python-schema-registry-client==2.6.1
- avro-python3==1.10.2
- fastavro==1.12.1
- (plus 4 transitive deps: jsonschema, jsonschema-specifications, referencing, rpds-py)
CI fix:
- e2e-ready/action.yml — Changed sudo k3s ctr images pull → sudo k3s crictl pull (uses CRI interface, targets correct k8s.io containerd namespace)
Config:
- Added faststream.Context to ruff's extend-immutable-calls
Good question. Let me look at industry practices:
1 Topic = 1 Event Type (fine-grained)
- Confluent's modern recommendation for event-driven microservices
- Schema evolution is simpler (1 schema per topic)
- No filtering needed - Kafka does routing
- Better observability (metrics/alerts per event type)
- Modern Kafka handles 10,000+ topics easily
1 Topic = N Event Types (coarse-grained)
- Traditional approach when Kafka topics were "expensive"
- Preserves ordering across event types (important for event sourcing)
- Requires filtering (header/body based)
- Common in DDD: 1 topic per aggregate (all Order events together)
The deciding question: Do you need ordering across event types?
Example: If EXECUTION_REQUESTED → EXECUTION_STARTED → EXECUTION_COMPLETED must be processed in order for the same execution_id, they should share a topic (partitioned by execution_id).
But looking at your architecture:
- Coordinator handles: REQUESTED, COMPLETED, FAILED, CANCELLED
- Result processor handles: COMPLETED, FAILED, TIMEOUT
- Different services, different events - no cross-event ordering needed
Modern consensus: Fine-grained topics (1:1) unless you specifically need cross-event ordering guarantees.
Companies like Uber, Netflix, LinkedIn moved toward more granular topics as Kafka matured.
Want me to refactor to 1 topic = 1 event type? It would:
- Eliminate all filtering
- Simplify handlers to pure type hints
- Make EVENT_TYPE_TO_TOPIC trivial (topic = event_type.value)
You're right. If 1 topic = 1 type:
- Handler subscribes to topic execution_requested with type ExecutionRequested
- Topic IS the type - no event_type field needed for routing
- FastStream deserializes based on type hint
The only question: when storing/retrieving from DB, how do we know the type?
Answer: Store topic name as metadata, not as event field:
● Bash(uv run python << 'EOF' timeout: 30s
"""Test: No event_type field at all - topic is the type"""…)
⎿ Serialized (no event_type): {"event_id":"","execution_id":"123","script":"print('hi')"}
Topic: execution_requested
… +25 lines (ctrl+o to expand)
● Final architecture - minimal:
class BaseEvent(BaseModel):
event_id: str = ""
timestamp: datetime = ...
metadata: EventMetadata
@classmethod
def topic(cls, prefix: str = "") -> str:
return f"{prefix}{to_snake_case(cls.__name__)}"
class ExecutionRequested(BaseEvent):
execution_id: str
script: str
# Handler - topic IS the type
@broker.subscriber(ExecutionRequested.topic(prefix), group_id="coordinator")
async def handle(event: ExecutionRequested): ...
# Producer - topic from class
await broker.publish(event, topic=type(event).topic(prefix))
# DB storage - topic as document metadata, not event field
{"topic": "execution_requested", "data": {...}}
Removed: event_type field, EventType enum, DomainEvent, registry, filters, discriminators
Kept: BaseEvent, specific classes, topic() classmethod
man that's a mess
|
There was a problem hiding this comment.
18 issues found across 102 files (changes from recent commits).
Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/idempotency/idempotency_manager.py">
<violation number="1" location="backend/app/services/idempotency/idempotency_manager.py:284">
P2: Inconsistent race condition handling: when `DuplicateKeyError` is caught, this doesn't check if the existing key has timed out (unlike `_create_new_key`), potentially blocking legitimate retries after timeout.</violation>
</file>
<file name="backend/app/api/routes/execution.py">
<violation number="1" location="backend/app/api/routes/execution.py:215">
P0: The `response_model=list[BaseEvent]` will cause data loss in the API response. `BaseEvent` does not include subclass-specific fields (like `script`, `exit_code`, `reason`) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.</violation>
<violation number="2" location="backend/app/api/routes/execution.py:219">
P1: Replacing `event_types` with `topics` is a breaking API change. Existing clients sending `event_types` will have their filter ignored (receiving all events instead of a subset), and they must now update to use snake_case topic names instead of the previous enum values. Consider supporting `event_types` for backward compatibility or documenting this break.</violation>
</file>
<file name="backend/app/db/docs/event.py">
<violation number="1" location="backend/app/db/docs/event.py:21">
P0: Renaming the `event_type` field to `topic` is a breaking schema change for the `events` collection.
1. Existing documents in the database likely contain `event_type` and lack `topic`.
2. Since `topic` is a required string field, Beanie will raise a `ValidationError` when reading any legacy document.
3. This also affects `EventArchiveDocument`.
Ensure a database migration script is run to rename the field in all documents before this code is deployed, or use `Field(validation_alias=AliasChoices('topic', 'event_type'))` to support both formats during transition.</violation>
<violation number="2" location="backend/app/db/docs/event.py:62">
P1: Updating the `idx_text_search` definition to use `topic` without changing the index name will cause a startup failure.
MongoDB does not allow recreating an existing index with the same name (`idx_text_search`) but different keys (`topic` vs `event_type`). The application will crash with an `IndexOptionsConflict` (or similar) upon initialization.
You must rename the index (e.g., to `idx_text_search_v2`) to allow Beanie to create a new index, or ensure the old index is dropped before deployment. Note that other indexes were correctly renamed (e.g., `idx_topic_ts`).</violation>
</file>
<file name="backend/app/db/repositories/admin/admin_events_repository.py">
<violation number="1" location="backend/app/db/repositories/admin/admin_events_repository.py:60">
P1: Replacing `DomainEventAdapter.validate_python` with `BaseEvent.model_validate` will likely result in data loss for event payloads. `BaseEvent` does not allow extra fields by default, so specific event fields (like `execution_id`, `script`, etc. stored in `EventDocument` extras) will be stripped during validation.
This will degrade the Admin UI "Browse Events" view by hiding event-specific details. Ensure `BaseEvent` allows extra fields or use a polymorphic adapter (like the removed `DomainEventAdapter`) to preserve full event data.</violation>
<violation number="2" location="backend/app/db/repositories/admin/admin_events_repository.py:69">
P1: Replacing `DomainEventAdapter.validate_python` with `BaseEvent.model_validate` causes data loss in the Event Detail view. Since `BaseEvent` strips extra fields, the returned `EventDetail` will lack the specific payload data (e.g., `execution_id`, `script`) that makes the detail view useful.
Consider using a polymorphic validator or updating `BaseEvent` configuration to allow extra fields.</violation>
<violation number="3" location="backend/app/db/repositories/admin/admin_events_repository.py:144">
P0: `get_event_stats` relies on `EventDocument.event_type`, which has been removed/replaced by `topic`. This will cause an `AttributeError` at runtime during the aggregation pipeline construction and execution.
The following queries in `get_event_stats` need to be updated to use `EventDocument.topic`:
1. `overview_pipeline`: `"event_types": {"$addToSet": S.field(EventDocument.event_type)}`
2. `error_count` query: `EventDocument.event_type: {"$regex": ...}`
3. `type_pipeline`: `.group(by=S.field(EventDocument.event_type), ...)`</violation>
</file>
<file name="backend/app/db/repositories/user_settings_repository.py">
<violation number="1" location="backend/app/db/repositories/user_settings_repository.py:33">
P2: Replacing the `EventType` enum with a list of raw strings (`topics`) removes type safety and central definitions for event topics.
The `EventDocument` documentation states that topics are derived from snake_case class names (e.g., `domain_user_settings_changed`), but existing callers (like `UserSettingsService`) appear to use legacy strings (e.g., `"user_settings_updated"`).
If the producer now uses the class-name-derived topic convention, queries using legacy strings will fail to find new events. Consider defining topic constants on the event models to ensure consistency between producers and consumers.</violation>
</file>
<file name="backend/app/schemas_pydantic/events.py">
<violation number="1" location="backend/app/schemas_pydantic/events.py:66">
P1: Using `list[BaseEvent]` causes data loss in the API response. `BaseEvent` is a Pydantic model serving as a base class; validating a list of subclass instances against it will strip all subclass-specific fields (like `execution_id`, `status`) and exclude the `topic` (which is a class method, not a field). The API will return only generic metadata.</violation>
</file>
<file name="backend/app/main.py">
<violation number="1" location="backend/app/main.py:63">
P2: The `KafkaBroker` in `main.py` is initialized without `IdempotencyMiddleware`, unlike the worker services (e.g., `run_coordinator.py`).
Since this broker serves `NotificationService` consumers (registered via `register_notification_subscriber`), omitting the middleware may lead to duplicate notifications during retries or rebalancing. The `create_app_container` also lacks `IdempotencyMiddlewareProvider`, which is present in worker containers.</violation>
</file>
<file name="backend/app/services/user_settings_service.py">
<violation number="1" location="backend/app/services/user_settings_service.py:56">
P0: Critical: The logic to persist 'user_settings_updated' events was removed from `update_user_settings` (via deletion of `_publish_settings_event` and `event_service`), but no replacement was added.
Since the system relies on Event Sourcing (rebuilding state from snapshot + events in `get_user_settings_fresh`), failing to persist the event means data loss. The update is currently only in-memory (cache) and will be lost on service restart unless a snapshot happens to be triggered.
You must inject a producer (e.g., `EventPublisher`) and persist the event.</violation>
</file>
<file name="backend/workers/run_event_replay.py">
<violation number="1" location="backend/workers/run_event_replay.py:16">
P1: The KafkaBroker instance is created but never started. In FastStream, the broker must be explicitly started (using `await broker.start()` or an async context manager) to initialize the producer connection. Without this, publishing events will likely fail or hang. Additionally, the broker should be gracefully closed on shutdown.</violation>
</file>
<file name="backend/app/dlq/models.py">
<violation number="1" location="backend/app/dlq/models.py:35">
P0: The usage of `BaseEvent` instead of a polymorphic union (like the removed `DomainEvent`) causes critical data loss. `BaseEvent` is configured with `extra='ignore'` (default), so when specific events (e.g., `ExecutionRequestedEvent`) are deserialized into `BaseEvent` or assigned to this field, all subclass-specific fields are stripped.
This means the DLQ will store incomplete events, and the retry mechanism will republish truncated events (missing critical fields like `execution_id`, `script`, etc.), causing downstream failures.
To fix this, either:
1. Update `BaseEvent` in `app/domain/events/typed.py` to allow extra fields (`model_config = ConfigDict(extra='allow')`).
2. Use a permissive subclass of `BaseEvent` in `DLQMessage`.
3. Use `dict` or `Any` for the `event` field (though this loses some type safety).</violation>
</file>
<file name="backend/app/services/idempotency/middleware.py">
<violation number="1" location="backend/app/services/idempotency/middleware.py:108">
P2: If Redis `delete` fails during cleanup, it will mask the original exception from `call_next`. Wrap the cleanup in a try-except to ensure the original error is preserved.</violation>
</file>
<file name="backend/scripts/create_topics.py">
<violation number="1" location="backend/scripts/create_topics.py:50">
P1: Topic configuration hardcodes `replication_factor` to 1, which creates single-point-of-failure topics unsuitable for production.
Consider deriving this value from `settings` or an environment variable (e.g., `settings.KAFKA_REPLICATION_FACTOR`) to support high availability in production environments.</violation>
</file>
<file name="backend/app/services/event_replay/replay_service.py">
<violation number="1" location="backend/app/services/event_replay/replay_service.py:230">
P0: Instantiating `BaseEvent` directly causes data loss and incorrect routing. `BaseEvent` is a base class and `model_validate` will not polymorphically instantiate the correct subclass (e.g., `ExecutionRequestedEvent`) based on the data.
1. Subclass-specific fields (e.g., `execution_id`, `script`) in `doc` will be discarded or fail validation.
2. `type(event).topic()` will resolve to the generic `base_event` topic instead of the specific event topic (e.g., `execution_requested`), causing events to be published to the wrong Kafka topic.
(Based on the removal of `DomainEventAdapter` which previously handled this.)</violation>
</file>
<file name="backend/workers/run_k8s_worker.py">
<violation number="1" location="backend/workers/run_k8s_worker.py:48">
P1: The `IdempotencyMiddleware` relies on `json.loads` to extract event IDs, which will fail or behave incorrectly if the message body is in Avro/Confluent wire format (binary). If the format is indeed Avro as per the PR description, this middleware will fail to compute keys, disabling idempotency for all messages.
Ensure the middleware is compatible with the wire format or is placed after a decoding middleware.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
|
||
|
|
||
| @router.get("/executions/{execution_id}/events", response_model=list[DomainEvent]) | ||
| @router.get("/executions/{execution_id}/events", response_model=list[BaseEvent]) |
There was a problem hiding this comment.
P0: The response_model=list[BaseEvent] will cause data loss in the API response. BaseEvent does not include subclass-specific fields (like script, exit_code, reason) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/api/routes/execution.py, line 215:
<comment>The `response_model=list[BaseEvent]` will cause data loss in the API response. `BaseEvent` does not include subclass-specific fields (like `script`, `exit_code`, `reason`) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.</comment>
<file context>
@@ -231,16 +212,16 @@ async def retry_execution(
-@router.get("/executions/{execution_id}/events", response_model=list[DomainEvent])
+@router.get("/executions/{execution_id}/events", response_model=list[BaseEvent])
async def get_execution_events(
execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
</file context>
|
|
||
| settings: DomainUserSettings | ||
| event_types = [EventType.USER_SETTINGS_UPDATED] | ||
| topics = ["user_settings_updated"] |
There was a problem hiding this comment.
P0: Critical: The logic to persist 'user_settings_updated' events was removed from update_user_settings (via deletion of _publish_settings_event and event_service), but no replacement was added.
Since the system relies on Event Sourcing (rebuilding state from snapshot + events in get_user_settings_fresh), failing to persist the event means data loss. The update is currently only in-memory (cache) and will be lost on service restart unless a snapshot happens to be triggered.
You must inject a producer (e.g., EventPublisher) and persist the event.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/user_settings_service.py, line 56:
<comment>Critical: The logic to persist 'user_settings_updated' events was removed from `update_user_settings` (via deletion of `_publish_settings_event` and `event_service`), but no replacement was added.
Since the system relies on Event Sourcing (rebuilding state from snapshot + events in `get_user_settings_fresh`), failing to persist the event means data loss. The update is currently only in-memory (cache) and will be lost on service restart unless a snapshot happens to be triggered.
You must inject a producer (e.g., `EventPublisher`) and persist the event.</comment>
<file context>
@@ -57,13 +53,13 @@ async def get_user_settings_fresh(self, user_id: str) -> DomainUserSettings:
settings: DomainUserSettings
- event_types = [EventType.USER_SETTINGS_UPDATED]
+ topics = ["user_settings_updated"]
if snapshot:
settings = snapshot
</file context>
| logger.info("KubernetesWorker infrastructure initialized") | ||
| await container.get(Database) | ||
| middleware = await container.get(IdempotencyMiddleware) | ||
| broker.add_middleware(middleware) |
There was a problem hiding this comment.
P1: The IdempotencyMiddleware relies on json.loads to extract event IDs, which will fail or behave incorrectly if the message body is in Avro/Confluent wire format (binary). If the format is indeed Avro as per the PR description, this middleware will fail to compute keys, disabling idempotency for all messages.
Ensure the middleware is compatible with the wire format or is placed after a decoding middleware.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_k8s_worker.py, line 48:
<comment>The `IdempotencyMiddleware` relies on `json.loads` to extract event IDs, which will fail or behave incorrectly if the message body is in Avro/Confluent wire format (binary). If the format is indeed Avro as per the PR description, this middleware will fail to compute keys, disabling idempotency for all messages.
Ensure the middleware is compatible with the wire format or is placed after a decoding middleware.</comment>
<file context>
@@ -33,20 +33,19 @@ def main() -> None:
- await container.get(Database) # triggers init_beanie inside provider
+ await container.get(Database)
+ middleware = await container.get(IdempotencyMiddleware)
+ broker.add_middleware(middleware)
logger.info("KubernetesWorker ready")
</file context>
| await self._repo.insert_processing(record) | ||
| self.metrics.increment_idempotency_keys(self.config.key_prefix) | ||
| return True # Reserved successfully, proceed with processing | ||
| except DuplicateKeyError: |
There was a problem hiding this comment.
P2: Inconsistent race condition handling: when DuplicateKeyError is caught, this doesn't check if the existing key has timed out (unlike _create_new_key), potentially blocking legitimate retries after timeout.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/idempotency/idempotency_manager.py, line 284:
<comment>Inconsistent race condition handling: when `DuplicateKeyError` is caught, this doesn't check if the existing key has timed out (unlike `_create_new_key`), potentially blocking legitimate retries after timeout.</comment>
<file context>
@@ -240,3 +243,63 @@ async def get_cached_json(
+ await self._repo.insert_processing(record)
+ self.metrics.increment_idempotency_keys(self.config.key_prefix)
+ return True # Reserved successfully, proceed with processing
+ except DuplicateKeyError:
+ # Race condition: another consumer reserved it first
+ self.metrics.record_idempotency_duplicate_blocked(event_type)
</file context>
| self, | ||
| user_id: str, | ||
| event_types: list[EventType], | ||
| topics: list[str], |
There was a problem hiding this comment.
P2: Replacing the EventType enum with a list of raw strings (topics) removes type safety and central definitions for event topics.
The EventDocument documentation states that topics are derived from snake_case class names (e.g., domain_user_settings_changed), but existing callers (like UserSettingsService) appear to use legacy strings (e.g., "user_settings_updated").
If the producer now uses the class-name-derived topic convention, queries using legacy strings will fail to find new events. Consider defining topic constants on the event models to ensure consistency between producers and consumers.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/user_settings_repository.py, line 33:
<comment>Replacing the `EventType` enum with a list of raw strings (`topics`) removes type safety and central definitions for event topics.
The `EventDocument` documentation states that topics are derived from snake_case class names (e.g., `domain_user_settings_changed`), but existing callers (like `UserSettingsService`) appear to use legacy strings (e.g., `"user_settings_updated"`).
If the producer now uses the class-name-derived topic convention, queries using legacy strings will fail to find new events. Consider defining topic constants on the event models to ensure consistency between producers and consumers.</comment>
<file context>
@@ -31,7 +30,7 @@ async def create_snapshot(self, settings: DomainUserSettings) -> None:
self,
user_id: str,
- event_types: list[EventType],
+ topics: list[str],
since: datetime | None = None,
until: datetime | None = None,
</file context>
| schema_registry = SchemaRegistryManager(settings, logger) | ||
| broker = create_broker(settings, schema_registry, logger) | ||
| register_event_store_subscriber(broker, settings) | ||
| broker = KafkaBroker(settings.KAFKA_BOOTSTRAP_SERVERS, logger=logger) |
There was a problem hiding this comment.
P2: The KafkaBroker in main.py is initialized without IdempotencyMiddleware, unlike the worker services (e.g., run_coordinator.py).
Since this broker serves NotificationService consumers (registered via register_notification_subscriber), omitting the middleware may lead to duplicate notifications during retries or rebalancing. The create_app_container also lacks IdempotencyMiddlewareProvider, which is present in worker containers.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/main.py, line 63:
<comment>The `KafkaBroker` in `main.py` is initialized without `IdempotencyMiddleware`, unlike the worker services (e.g., `run_coordinator.py`).
Since this broker serves `NotificationService` consumers (registered via `register_notification_subscriber`), omitting the middleware may lead to duplicate notifications during retries or rebalancing. The `create_app_container` also lacks `IdempotencyMiddlewareProvider`, which is present in worker containers.</comment>
<file context>
@@ -61,8 +60,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
# Create Kafka broker and register in-app subscribers
- schema_registry = SchemaRegistryManager(settings, logger)
- broker = create_broker(settings, schema_registry, logger)
+ broker = KafkaBroker(settings.KAFKA_BOOTSTRAP_SERVERS, logger=logger)
register_sse_subscriber(broker, settings)
register_notification_subscriber(broker, settings)
</file context>
| try: | ||
| return await call_next(msg) | ||
| except BaseException: | ||
| await self._redis.delete(key) |
There was a problem hiding this comment.
P2: If Redis delete fails during cleanup, it will mask the original exception from call_next. Wrap the cleanup in a try-except to ensure the original error is preserved.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/idempotency/middleware.py, line 108:
<comment>If Redis `delete` fails during cleanup, it will mask the original exception from `call_next`. Wrap the cleanup in a try-except to ensure the original error is preserved.</comment>
<file context>
@@ -0,0 +1,109 @@
+ try:
+ return await call_next(msg)
+ except BaseException:
+ await self._redis.delete(key)
+ raise
</file context>
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
backend/app/services/admin/admin_user_service.py (1)
76-92:⚠️ Potential issue | 🟡 MinorUse the existing
EventTypeenum instead of hardcoded topic strings.The
EventTypeenum already exists inbackend/app/domain/enums/events.pywith the execution-related topics you need. Reference these enum values instead of duplicating the strings:from app.domain.enums.events import EventType execution_topics = [ EventType.EXECUTION_REQUESTED, EventType.EXECUTION_STARTED, EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED, EventType.EXECUTION_TIMEOUT, EventType.EXECUTION_CANCELLED, ]This avoids silent breakage if the enum values change and aligns with the pattern used elsewhere in the codebase.
backend/app/main.py (1)
6-63:⚠️ Potential issue | 🟡 MinorKafkaBroker instantiation does not apply consumer timeout settings; verify if this is intentional.
Settings defines
KAFKA_SESSION_TIMEOUT_MS,KAFKA_HEARTBEAT_INTERVAL_MS, andKAFKA_REQUEST_TIMEOUT_MS, but these are not passed toKafkaBroker()in main.py. Similarly,KAFKA_CONSUMER_GROUP_IDandKAFKA_ENABLE_AUTO_COMMITare unused—each subscriber decorator uses hardcodedgroup_idvalues instead. This pattern is consistent across all workers (saga orchestrator, result processor, pod monitor, k8s worker, coordinator).Note: No SASL/SSL configuration exists for Kafka, and subscriber-level config (auto_offset_reset, max_poll_records, ack_policy) is set via decorator parameters, not the broker constructor. The timeout settings are only applied to the temporary consumer in
consumer_group_monitor.py. Clarify whether these unused settings should be removed or if the KafkaBroker initialization should be updated to use them.backend/app/api/routes/admin/events.py (1)
68-115:⚠️ Potential issue | 🟠 MajorBreaking change: frontend still sends
event_typesparameter; backend now expectstopics.Frontend code actively sends the old
event_typesparameter (AdminEvents.svelte:191), but the backend endpoints accept onlytopics. This causes filtering to silently fail—requests won't error, filters just won't apply. Adding backward compatibility is essential for a smooth transition.🛠️ Backward-compatible mapping
async def export_events_csv( service: FromDishka[AdminEventsService], topics: list[str] | None = Query(None, description="Event topics (repeat param for multiple)"), + event_types: list[str] | None = Query(None, description="(deprecated) Event types", deprecated=True), start_time: datetime | None = Query(None, description="Start time"), end_time: datetime | None = Query(None, description="End time"), limit: int = Query(default=10000, le=50000), ) -> StreamingResponse: try: + topics = topics or event_types export_filter = EventFilter( topics=topics, start_time=start_time, end_time=end_time, )async def export_events_json( service: FromDishka[AdminEventsService], topics: list[str] | None = Query(None, description="Event topics (repeat param for multiple)"), + event_types: list[str] | None = Query(None, description="(deprecated) Event types", deprecated=True), aggregate_id: str | None = Query(None, description="Aggregate ID filter"), correlation_id: str | None = Query(None, description="Correlation ID filter"), user_id: str | None = Query(None, description="User ID filter"), service_name: str | None = Query(None, description="Service name filter"), start_time: datetime | None = Query(None, description="Start time"), end_time: datetime | None = Query(None, description="End time"), limit: int = Query(default=10000, le=50000), ) -> StreamingResponse: """Export events as JSON with comprehensive filtering.""" try: + topics = topics or event_types export_filter = EventFilter( topics=topics, aggregate_id=aggregate_id, correlation_id=correlation_id, user_id=user_id, service_name=service_name, start_time=start_time, end_time=end_time, )backend/app/services/idempotency/idempotency_manager.py (1)
48-67:⚠️ Potential issue | 🟡 MinorNamespace CONTENT_HASH keys by topic to avoid cross-topic collisions.
CONTENT_HASH keys currently hash only the payload, so different topics with identical content could block each other. Prefixing the hash with the topic keeps keys aligned with EVENT_BASED/CUSTOM and the new middleware.
Suggested change
- key = hashlib.sha256(content.encode()).hexdigest() + key = f"{topic}:{hashlib.sha256(content.encode()).hexdigest()}"backend/app/services/event_service.py (1)
126-132:⚠️ Potential issue | 🟡 MinorMap event_type sorting to topic to match the new schema.
If the stored field is now
topic, sorting byevent_typewill be ineffective. Consider mapping bothevent_typeandtopicto thetopicfield.Suggested change
- "event_type": "event_type", + "event_type": "topic", + "topic": "topic",backend/app/db/repositories/admin/admin_events_repository.py (2)
98-145:⚠️ Potential issue | 🟡 MinorStats pipeline still groups by
event_type.
With the topic-based migration,event_typeis likely absent or stale, so stats/error_rate can be wrong. Update pipeline and error query to useEventDocument.topic.🛠️ Suggested fix
- "event_types": {"$addToSet": S.field(EventDocument.event_type)}, + "event_types": {"$addToSet": S.field(EventDocument.topic)}, @@ - EventDocument.event_type: {"$regex": "failed|error|timeout", "$options": "i"}, + EventDocument.topic: {"$regex": "failed|error|timeout", "$options": "i"}, @@ - .group(by=S.field(EventDocument.event_type), query={"count": S.sum(1)}) + .group(by=S.field(EventDocument.topic), query={"count": S.sum(1)})
218-223:⚠️ Potential issue | 🟠 MajorArchived events are missing
topic.
event.model_dump()doesn’t include atopicfield, so the archive document can be incomplete or fail validation. Populate it explicitly.🛠️ Suggested fix
- archive_doc = EventArchiveDocument( - **event.model_dump(), + archive_doc = EventArchiveDocument( + **event.model_dump(), + topic=type(event).topic(), deleted_at=datetime.now(timezone.utc), deleted_by=deleted_by, )backend/app/api/routes/execution.py (1)
76-83:⚠️ Potential issue | 🟠 MajorDo not instantiate
BaseEventdirectly as a pseudo-event; use a proper event subclass instead.The
IdempotencyManager.check_and_reserve()method callstype(event).topic()on the event (line 80 of idempotency_manager.py), which will return"base_event"for a directBaseEventinstance. This is not a valid domain topic and will cause incorrect idempotency tracking with wrong topic values in metrics and records. Create a concrete event subclass for the pseudo-event or use an existing event class instead.
🤖 Fix all issues with AI agents
In `@backend/app/api/routes/events.py`:
- Around line 239-258: The PublishEventRequest.payload is never attached to the
BaseEvent, so the custom payload is dropped when creating event =
BaseEvent(...); update the creation to include the payload (from
event_request.payload) into the BaseEvent metadata or a dedicated payload field
so the data is preserved before calling producer.publish; alternatively change
BaseEvent to allow extra fields (extra="allow") like UserSettingsUpdatedEvent if
you prefer passthrough, but the quick fix is to pass event_request.payload into
the BaseEvent constructor (or into EventMetadata) so producer.publish receives
the payload.
In `@backend/app/db/docs/event.py`:
- Around line 17-22: The new required Model field topic on the Event doc
(declared alongside event_id and event_version) will break existing documents
that only have event_type; implement a migration/backfill that reads existing
documents with event_type, computes topic (snake_case class name without 'Event'
suffix or map via existing event_type), writes topic into each document, and
rebuilds the affected indexes (including archive indexes), or add a temporary
compatibility layer in the Event model that returns topic fallback = event_type
when topic is missing; update any query paths that rely on topic to tolerate the
fallback during rollout and mark the migration to run before removing the
fallback.
In `@backend/app/domain/events/typed.py`:
- Around line 1-22: The _to_snake_case function currently splits every capital
letter (turning "DLQMessageReceivedEvent" into "d_l_q_message_received"); update
it to preserve acronym blocks by first stripping the trailing "Event" then
applying a two-step regex split: 1) insert underscores between a
lowercase/number and a capital-started word (e.g., between "o" and "Request"),
and 2) insert underscores between a capital followed by lowercase sequence
boundary (so acronym sequences remain intact), then lower-case the result;
modify _to_snake_case accordingly so topics like DLQMessageReceivedEvent become
dlq_message_received while still handling normal CamelCase names.
- Around line 50-71: BaseEvent's Pydantic config currently omits unknown-event
fields causing them to be dropped on deserialization; update
BaseEvent.model_config to include extra="allow" (e.g., add extra="allow" into
the ConfigDict alongside from_attributes and json_schema_extra) so that
event-specific fields stored by EventDocument remain available when
instantiating BaseEvent (match the approach used by UserSettingsUpdatedEvent).
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 229-240: BaseEvent.model_validate(doc) collapses the concrete type
into BaseEvent so replay publishes every event to the base_event topic; instead,
determine the concrete event class from the document (e.g., via a
type/discriminator field or an event-class registry) and call that concrete
class's model_validate/constructor before appending/publishing; update the
handlers around BaseEvent.model_validate(doc) (where batch.append(event) and
EventPublisher.publish(...) are used) to validate/rehydrate into the correct
subclass (or use a topic-driven factory) and keep session.failed_events++ on
validation errors.
In `@backend/tests/e2e/dlq/test_dlq_manager.py`:
- Around line 94-95: The assertion is using the wrong derived topic string for
DLQMessageReceivedEvent; because _to_snake_case() strips "Event" and converts
camelCase to snake_case, DLQMessageReceivedEvent becomes
"d_l_q_message_received", so update the assertion that checks received.topic()
(in the test referencing DLQMessageReceivedEvent/received) to expect
"d_l_q_message_received" instead of "dlq_message_received".
In `@backend/tests/e2e/events/test_producer_roundtrip.py`:
- Around line 25-27: The DLQ call uses an unprefixed topic (type(ev).topic())
while EventPublisher.publish() uses a prefixed topic via
type(event).topic(self._prefix), causing mismatch when a prefix is configured;
update the test to derive the DLQ original_topic the same way as publish by
passing the configured prefix (e.g., call type(ev).topic(prod._prefix) or obtain
the prefix from settings) so the original_topic passed to prod.send_to_dlq(...)
matches the actual published topic; reference EventPublisher.publish(),
send_to_dlq, and type(...).topic(...) when making the change.
In `@backend/tests/unit/services/pod_monitor/test_monitor.py`:
- Around line 370-374: The test's FailingProducer currently overrides produce so
PodMonitor's failure path isn't hit; change the override to implement async def
publish(self, event: BaseEvent, key: str | None = None) -> str in class
FailingProducer (which extends FakeEventPublisher) and raise
RuntimeError("Publish failed") there so PodMonitor.publish calls exercise the
error path; keep the same signature and return type to match
FakeEventPublisher/PodMonitor expectations.
In `@backend/workers/run_event_replay.py`:
- Around line 10-18: The KafkaBroker created in run_replay_service is never
started or stopped; update run_replay_service to explicitly manage the broker
lifecycle by calling await broker.start() before using
create_event_replay_container(...) and ensuring await broker.stop() runs in a
finally block (or convert the broker to an AsyncIterator provider like the
Redis/Database providers), so the context-injected KafkaBroker is started for
the service and reliably stopped when container.close() completes.
🧹 Nitpick comments (6)
backend/app/services/user_settings_service.py (1)
56-63: Centralize the settings topic to avoid drift.
The"user_settings_updated"literal is repeated across methods; if topic naming changes, queries/history will silently miss events. Consider deriving it once from the event class or a constant.♻️ Suggested consolidation
+SETTINGS_UPDATED_TOPIC = DomainUserSettingsChangedEvent.topic() - topics = ["user_settings_updated"] + topics = [SETTINGS_UPDATED_TOPIC] ... - events = await self.repository.get_settings_events(user_id, ["user_settings_updated"], limit=limit) + events = await self.repository.get_settings_events(user_id, [SETTINGS_UPDATED_TOPIC], limit=limit) ... - topic="user_settings_updated", + topic=SETTINGS_UPDATED_TOPIC, ... - events = await self.repository.get_settings_events(user_id, ["user_settings_updated"], until=timestamp) + events = await self.repository.get_settings_events(user_id, [SETTINGS_UPDATED_TOPIC], until=timestamp)Also applies to: 127-134, 146-146
backend/app/schemas_pydantic/admin_user_overview.py (1)
33-34: Usedefault_factoryfor list default.
If Pydantic doesn’t deep-copy mutable defaults per instance in 2.9.2, this can lead to shared state. ConsiderField(default_factory=list)to be safe.♻️ Proposed change
-from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field @@ - recent_events: list[BaseEvent] = [] + recent_events: list[BaseEvent] = Field(default_factory=list)backend/tests/e2e/services/execution/test_execution_service.py (1)
189-216: Consider applying the configured topic prefix in filters.
IfKAFKA_TOPIC_PREFIXis non-empty in test settings,ExecutionRequestedEvent.topic()without a prefix may not match stored topics. Using the configured prefix would make the test more robust across environments.backend/tests/e2e/test_admin_events_routes.py (1)
62-70: Avoid hardcoding topic strings in filters.
Optional, but using the event class topic keeps tests resilient to renames.🔁 Optional refactor
- filters=EventFilter( - topics=["execution_requested"], - aggregate_id=stored_event.aggregate_id, - ), + filters=EventFilter( + topics=[type(stored_event).topic()], + aggregate_id=stored_event.aggregate_id, + ),Also applies to: 104-109
backend/app/schemas_pydantic/admin_events.py (1)
12-22: Consider consolidating duplicate EventFilter definitions.There's an
EventFilterclass in bothadmin_events.py(for browsing) andevent_models.py(for querying) with similar fields. If they serve the same purpose, consider consolidating them to reduce duplication and maintenance overhead.#!/bin/bash # Check both EventFilter definitions for potential consolidation rg -n "class EventFilter" --type py -A 15backend/app/db/repositories/event_repository.py (1)
64-77: Consider handling duplicate keys in batch inserts.Unlike
store_event,store_events_batchdoesn't handleDuplicateKeyError. If any event in the batch has a duplicateevent_id, the entire batch insert may fail (depending on MongoDB's behavior forinsert_manywithordered=Truewhich is the default).Consider using
ordered=Falseto allow partial success, or catchingBulkWriteErrorto extract successfully inserted event IDs.♻️ Proposed fix for idempotent batch inserts
+from pymongo.errors import BulkWriteError, DuplicateKeyError + async def store_events_batch(self, events: list[BaseEvent]) -> list[str]: if not events: return [] now = datetime.now(timezone.utc) docs = [] for event in events: data = event.model_dump(exclude_none=True) data["topic"] = type(event).topic() data.setdefault("stored_at", now) docs.append(EventDocument(**data)) - await EventDocument.insert_many(docs) + try: + await EventDocument.insert_many(docs, ordered=False) + except BulkWriteError as e: + # Some inserts may have succeeded; log and continue + self.logger.debug(f"Batch insert completed with {e.details.get('nInserted', 0)} inserts, some duplicates skipped") add_span_attributes(**{"events.batch.count": len(events)}) self.logger.info(f"Stored {len(events)} events in batch") return [event.event_id for event in events]
| """Publish a custom event (admin only). Creates a BaseEvent with the provided payload.""" | ||
| base_meta = EventMetadata( | ||
| service_name=settings.SERVICE_NAME, | ||
| service_version=settings.SERVICE_VERSION, | ||
| user_id=admin.user_id, | ||
| ip_address=get_client_ip(request), | ||
| user_agent=request.headers.get("user-agent"), | ||
| correlation_id=event_request.correlation_id or CorrelationContext.get_correlation_id(), | ||
| ) | ||
| # Merge any additional metadata provided in request (extra allowed) | ||
| if event_request.metadata: | ||
| base_meta = base_meta.model_copy(update=event_request.metadata) | ||
|
|
||
| event_id = await event_service.publish_event( | ||
| event_type=event_request.event_type, | ||
| payload=event_request.payload, | ||
| # Create a BaseEvent with the custom payload in metadata | ||
| event = BaseEvent( | ||
| aggregate_id=event_request.aggregate_id, | ||
| correlation_id=event_request.correlation_id, | ||
| metadata=base_meta, | ||
| ) | ||
|
|
||
| event_id = await producer.publish(event=event, key=event_request.aggregate_id) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and inspect PublishEventRequest class definition
rg -n -A15 'class PublishEventRequest' -SRepository: HardMax71/Integr8sCode
Length of output: 1452
🏁 Script executed:
#!/bin/bash
# Find and inspect BaseEvent class definition
rg -n -A15 'class BaseEvent' -SRepository: HardMax71/Integr8sCode
Length of output: 2592
🏁 Script executed:
#!/bin/bash
# Check the BaseEvent model configuration for extra fields handling
rg -n -B5 -A20 'class BaseEvent' backend/app/domain/events/typed.py -SRepository: HardMax71/Integr8sCode
Length of output: 1029
🏁 Script executed:
#!/bin/bash
# Read the exact function code around lines 239-258 in events.py
sed -n '230,260p' backend/app/api/routes/events.pyRepository: HardMax71/Integr8sCode
Length of output: 1370
🏁 Script executed:
#!/bin/bash
# Check if BaseEvent has any extra="allow" or similar configuration
rg -n 'extra' backend/app/domain/events/typed.py -A2 -B2Repository: HardMax71/Integr8sCode
Length of output: 374
Custom event payload is dropped and will not be published.
PublishEventRequest includes a required payload field, but it is never passed to the BaseEvent constructor. Since BaseEvent doesn't have extra="allow" configured and lacks a payload field, the custom payload data will be silently discarded due to Pydantic's default behavior of ignoring unknown fields.
💡 Proposed fix (preserve payload explicitly)
- # Create a BaseEvent with the custom payload in metadata
- event = BaseEvent(
- aggregate_id=event_request.aggregate_id,
- metadata=base_meta,
- )
+ payload = event_request.payload or {}
+ event = BaseEvent.model_validate(
+ {**payload, "aggregate_id": event_request.aggregate_id, "metadata": base_meta}
+ )Alternatively, enable extra field passthrough on BaseEvent with extra="allow" (as done in UserSettingsUpdatedEvent).
🤖 Prompt for AI Agents
In `@backend/app/api/routes/events.py` around lines 239 - 258, The
PublishEventRequest.payload is never attached to the BaseEvent, so the custom
payload is dropped when creating event = BaseEvent(...); update the creation to
include the payload (from event_request.payload) into the BaseEvent metadata or
a dedicated payload field so the data is preserved before calling
producer.publish; alternatively change BaseEvent to allow extra fields
(extra="allow") like UserSettingsUpdatedEvent if you prefer passthrough, but the
quick fix is to pass event_request.payload into the BaseEvent constructor (or
into EventMetadata) so producer.publish receives the payload.
| Topic field stores the event type name (e.g., 'execution_requested'). | ||
| """ | ||
|
|
||
| event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type] | ||
| event_type: EventType # Indexed via Settings.indexes | ||
| topic: str # Event topic name (snake_case class name without 'Event' suffix) | ||
| event_version: str = "1.0" |
There was a problem hiding this comment.
Plan a backfill/migration for the new required topic field.
Existing events that only have event_type will fail validation or be invisible to the new topic-based queries/indexes (including archives). Please ensure a migration/backfill (and index rebuild) is in place before rollout, or add a temporary compatibility path.
Also applies to: 39-63, 81-83, 101-101
🤖 Prompt for AI Agents
In `@backend/app/db/docs/event.py` around lines 17 - 22, The new required Model
field topic on the Event doc (declared alongside event_id and event_version)
will break existing documents that only have event_type; implement a
migration/backfill that reads existing documents with event_type, computes topic
(snake_case class name without 'Event' suffix or map via existing event_type),
writes topic into each document, and rebuilds the affected indexes (including
archive indexes), or add a temporary compatibility layer in the Event model that
returns topic fallback = event_type when topic is missing; update any query
paths that rely on topic to tolerate the fallback during rollout and mark the
migration to run before removing the fallback.
| import re | ||
| from datetime import datetime, timezone | ||
| from typing import Annotated, Literal | ||
| from uuid import uuid4 | ||
|
|
||
| from pydantic import ConfigDict, Discriminator, Field, TypeAdapter | ||
| from pydantic_avro.to_avro.base import AvroBase | ||
| from pydantic import BaseModel, ConfigDict, Field | ||
|
|
||
| from app.domain.enums.auth import LoginMethod | ||
| from app.domain.enums.common import Environment | ||
| from app.domain.enums.events import EventType | ||
| from app.domain.enums.execution import QueuePriority | ||
| from app.domain.enums.notification import NotificationChannel, NotificationSeverity | ||
| from app.domain.enums.storage import ExecutionErrorType, StorageType | ||
|
|
||
|
|
||
| class ResourceUsageDomain(AvroBase): | ||
| def _to_snake_case(name: str) -> str: | ||
| """Convert class name to snake_case topic name. | ||
|
|
||
| ExecutionRequestedEvent -> execution_requested | ||
| PodCreatedEvent -> pod_created | ||
| """ | ||
| if name.endswith("Event"): | ||
| name = name[:-5] | ||
| return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower() |
There was a problem hiding this comment.
Acronym splitting changes topic names (e.g., DLQ → d_l_q).
The current regex inserts underscores between every capital, so DLQMessageReceivedEvent becomes d_l_q_message_received. If consumers/topics expect dlq_message_received, routing breaks. Consider a conversion that preserves acronym blocks.
🛠️ Suggested snake_case conversion
def _to_snake_case(name: str) -> str:
"""Convert class name to snake_case topic name.
@@
if name.endswith("Event"):
name = name[:-5]
- return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
+ name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
+ name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
+ return name.lower()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import re | |
| from datetime import datetime, timezone | |
| from typing import Annotated, Literal | |
| from uuid import uuid4 | |
| from pydantic import ConfigDict, Discriminator, Field, TypeAdapter | |
| from pydantic_avro.to_avro.base import AvroBase | |
| from pydantic import BaseModel, ConfigDict, Field | |
| from app.domain.enums.auth import LoginMethod | |
| from app.domain.enums.common import Environment | |
| from app.domain.enums.events import EventType | |
| from app.domain.enums.execution import QueuePriority | |
| from app.domain.enums.notification import NotificationChannel, NotificationSeverity | |
| from app.domain.enums.storage import ExecutionErrorType, StorageType | |
| class ResourceUsageDomain(AvroBase): | |
| def _to_snake_case(name: str) -> str: | |
| """Convert class name to snake_case topic name. | |
| ExecutionRequestedEvent -> execution_requested | |
| PodCreatedEvent -> pod_created | |
| """ | |
| if name.endswith("Event"): | |
| name = name[:-5] | |
| return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower() | |
| import re | |
| from datetime import datetime, timezone | |
| from uuid import uuid4 | |
| from pydantic import BaseModel, ConfigDict, Field | |
| from app.domain.enums.auth import LoginMethod | |
| from app.domain.enums.common import Environment | |
| from app.domain.enums.execution import QueuePriority | |
| from app.domain.enums.notification import NotificationChannel, NotificationSeverity | |
| from app.domain.enums.storage import ExecutionErrorType, StorageType | |
| def _to_snake_case(name: str) -> str: | |
| """Convert class name to snake_case topic name. | |
| ExecutionRequestedEvent -> execution_requested | |
| PodCreatedEvent -> pod_created | |
| """ | |
| if name.endswith("Event"): | |
| name = name[:-5] | |
| name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name) | |
| name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name) | |
| return name.lower() |
🤖 Prompt for AI Agents
In `@backend/app/domain/events/typed.py` around lines 1 - 22, The _to_snake_case
function currently splits every capital letter (turning
"DLQMessageReceivedEvent" into "d_l_q_message_received"); update it to preserve
acronym blocks by first stripping the trailing "Event" then applying a two-step
regex split: 1) insert underscores between a lowercase/number and a
capital-started word (e.g., between "o" and "Request"), and 2) insert
underscores between a capital followed by lowercase sequence boundary (so
acronym sequences remain intact), then lower-case the result; modify
_to_snake_case accordingly so topics like DLQMessageReceivedEvent become
dlq_message_received while still handling normal CamelCase names.
| class BaseEvent(BaseModel): | ||
| """Base fields for all domain events. | ||
|
|
||
| Topic routing: Each event class maps to its own Kafka topic. | ||
| Topic name is derived from class name: ExecutionRequestedEvent -> execution_requested | ||
| """ | ||
|
|
||
| # Pydantic marks fields with default/default_factory as optional in JSON Schema, | ||
| # which generates optional TypeScript types (e.g., `event_id?: string`). | ||
| # Since stored events always have these fields, we override the schema to mark them required. | ||
| # See: https://github.com/pydantic/pydantic/issues/7209 | ||
| # See: https://github.com/pydantic/pydantic/discussions/6073 | ||
| model_config = ConfigDict( | ||
| from_attributes=True, | ||
| json_schema_extra={"required": ["event_id", "event_type", "event_version", "timestamp", "metadata"]}, | ||
| json_schema_extra={"required": ["event_id", "event_version", "timestamp", "metadata"]}, | ||
| ) | ||
|
|
||
| event_id: str = Field(default_factory=lambda: str(uuid4())) | ||
| event_type: EventType | ||
| event_version: str = "1.0" | ||
| timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) | ||
| aggregate_id: str | None = None | ||
| metadata: EventMetadata | ||
|
|
||
| @classmethod | ||
| def topic(cls, prefix: str = "") -> str: | ||
| """Get Kafka topic name for this event class.""" | ||
| return f"{prefix}{_to_snake_case(cls.__name__)}" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.py" | head -20Repository: HardMax71/Integr8sCode
Length of output: 801
🏁 Script executed:
rg -t py "BaseEvent" --max-count 30 -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 50378
🏁 Script executed:
rg -t py "browse|replay" --max-count 20 -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 50378
🏁 Script executed:
fd -t f -e py "repo" | head -20Repository: HardMax71/Integr8sCode
Length of output: 1182
🏁 Script executed:
rg -t py "model_validate.*BaseEvent" -A 3 -B 3 | head -100Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
cat backend/app/domain/events/typed.py | head -100Repository: HardMax71/Integr8sCode
Length of output: 2948
🏁 Script executed:
rg -t py "class EventDocument" -A 20Repository: HardMax71/Integr8sCode
Length of output: 1800
🏁 Script executed:
rg -t py "extra=.allow" backend/app/domain/events/ -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 347
🏁 Script executed:
rg -t py "discriminator|union|Union\[.*Event" backend/app/domain/events/ -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg -t py "get_event|model_validate" backend/app/db/repositories/event_repository.py -B 3 -A 5 | head -150Repository: HardMax71/Integr8sCode
Length of output: 5479
🏁 Script executed:
cat backend/app/db/repositories/event_repository.py | head -200Repository: HardMax71/Integr8sCode
Length of output: 7961
🏁 Script executed:
rg -t py "\.execution_id|\.script|\.pod_name" backend/app/api/routes/admin/events.py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
cat backend/app/api/routes/admin/events.py | head -150Repository: HardMax71/Integr8sCode
Length of output: 5351
🏁 Script executed:
rg -t py "type\(.*event\)|isinstance.*Event" backend/app/services/ -B 2 -A 2 | head -80Repository: HardMax71/Integr8sCode
Length of output: 4384
🏁 Script executed:
rg -t py "event\.execution_id|event\.script|event\.pod_name|event\." backend/app/services/admin/admin_events_service.py -B 2 -A 2 | head -80Repository: HardMax71/Integr8sCode
Length of output: 474
🏁 Script executed:
cat backend/app/services/event_replay/replay_service.py | grep -A 20 "BaseEvent.model_validate"Repository: HardMax71/Integr8sCode
Length of output: 846
🏁 Script executed:
rg -t py "test.*BaseEvent|def test" backend/tests/e2e/test_events_routes.py | head -20Repository: HardMax71/Integr8sCode
Length of output: 1060
🏁 Script executed:
cat backend/tests/e2e/test_events_routes.py | grep -A 30 "def test_replay_events_dry_run"Repository: HardMax71/Integr8sCode
Length of output: 1237
🏁 Script executed:
rg -t py "model_dump|export.*json|export.*csv" backend/app/services/admin/admin_events_service.py -B 3 -A 3Repository: HardMax71/Integr8sCode
Length of output: 2345
🏁 Script executed:
cat backend/app/db/repositories/admin/admin_events_repository.py | grep -A 30 "async def browse_events"Repository: HardMax71/Integr8sCode
Length of output: 1352
BaseEvent drops event-specific fields during deserialization without extra="allow".
Events are stored in MongoDB with event-specific fields preserved (EventDocument has extra="allow"), but when retrieved and validated as BaseEvent, Pydantic drops any unknown fields because BaseEvent lacks this configuration. This causes silent data loss in browse/replay responses and any code that processes deserialized events. EventDocument stores fields like execution_id, pod_name, script, etc., but they become inaccessible once the model is instantiated as BaseEvent.
Add extra="allow" to BaseEvent's model_config to preserve these fields through the deserialization pipeline, matching the pattern already established by UserSettingsUpdatedEvent:
Suggested fix
model_config = ConfigDict(
from_attributes=True,
+ extra="allow",
json_schema_extra={"required": ["event_id", "event_version", "timestamp", "metadata"]},
)🤖 Prompt for AI Agents
In `@backend/app/domain/events/typed.py` around lines 50 - 71, BaseEvent's
Pydantic config currently omits unknown-event fields causing them to be dropped
on deserialization; update BaseEvent.model_config to include extra="allow"
(e.g., add extra="allow" into the ConfigDict alongside from_attributes and
json_schema_extra) so that event-specific fields stored by EventDocument remain
available when instantiating BaseEvent (match the approach used by
UserSettingsUpdatedEvent).
| try: | ||
| event = BaseEvent.model_validate(doc) | ||
| except ValidationError as e: | ||
| session.failed_events += 1 | ||
| self.logger.warning( | ||
| "Skipping event that failed validation", | ||
| extra={"event_id": doc.get("event_id", "unknown"), "error": str(e)}, | ||
| ) | ||
| continue | ||
|
|
||
| batch.append(event) | ||
| events_processed += 1 |
There was a problem hiding this comment.
BaseEvent validation collapses concrete types; replay will publish to the wrong topic.
BaseEvent.model_validate(doc) yields BaseEvent instances, so EventPublisher.publish() will route everything to base_event. Replay needs concrete event subclasses (or a topic-driven factory) to preserve topic routing and metrics.
🔧 Suggested fix (rehydrate concrete events)
-from app.domain.events.typed import BaseEvent
+from app.domain.events.typed import BaseEvent, DomainEventAdapter
...
- try:
- event = BaseEvent.model_validate(doc)
+ try:
+ event = DomainEventAdapter.validate_python(doc)Also applies to: 286-287, 301-301
🤖 Prompt for AI Agents
In `@backend/app/services/event_replay/replay_service.py` around lines 229 - 240,
BaseEvent.model_validate(doc) collapses the concrete type into BaseEvent so
replay publishes every event to the base_event topic; instead, determine the
concrete event class from the document (e.g., via a type/discriminator field or
an event-class registry) and call that concrete class's
model_validate/constructor before appending/publishing; update the handlers
around BaseEvent.model_validate(doc) (where batch.append(event) and
EventPublisher.publish(...) are used) to validate/rehydrate into the correct
subclass (or use a topic-driven factory) and keep session.failed_events++ on
validation errors.
| assert type(received).topic() == "dlq_message_received" | ||
| assert "execution" in received.original_topic.lower() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the DLQMessageReceivedEvent class definition and check the topic derivation
rg -n "class DLQ.*Event" --type py -A 5 -B 1Repository: HardMax71/Integr8sCode
Length of output: 1424
🏁 Script executed:
# Also check for the topic() method implementation to understand how topics are derived
rg -n "def topic\(" --type py -A 3Repository: HardMax71/Integr8sCode
Length of output: 758
🏁 Script executed:
# Search for where DLQMessageReceivedEvent is defined/imported in the test file
rg -n "DLQMessageReceivedEvent" backend/tests/e2e/dlq/test_dlq_manager.py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 1494
🏁 Script executed:
# Find the _to_snake_case implementation
rg -n "_to_snake_case" --type py -B 2 -A 8Repository: HardMax71/Integr8sCode
Length of output: 1474
🏁 Script executed:
# Verify the actual expected topic by checking other usages of DLQMessageReceivedEvent or similar events
rg -n "DLQMessageReceivedEvent.*topic\|ExecutionRequestedEvent.*topic" --type py -B 1 -A 1Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Also check if there's an actual Kafka topic defined with this name
rg -n "dlq_message_received|d_l_q_message_received|execution_requested" --type py -C 1 | head -50Repository: HardMax71/Integr8sCode
Length of output: 3845
Update the assertion to use the correct derived topic string.
The _to_snake_case() function strips the "Event" suffix and converts camelCase to snake_case. For DLQMessageReceivedEvent, this produces d_l_q_message_received, not dlq_message_received or dlq_message_received_event.
Update line 94:
🔧 Correct fix
- assert type(received).topic() == "dlq_message_received"
+ assert type(received).topic() == "d_l_q_message_received"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert type(received).topic() == "dlq_message_received" | |
| assert "execution" in received.original_topic.lower() | |
| assert type(received).topic() == "d_l_q_message_received" | |
| assert "execution" in received.original_topic.lower() |
🤖 Prompt for AI Agents
In `@backend/tests/e2e/dlq/test_dlq_manager.py` around lines 94 - 95, The
assertion is using the wrong derived topic string for DLQMessageReceivedEvent;
because _to_snake_case() strips "Event" and converts camelCase to snake_case,
DLQMessageReceivedEvent becomes "d_l_q_message_received", so update the
assertion that checks received.topic() (in the test referencing
DLQMessageReceivedEvent/received) to expect "d_l_q_message_received" instead of
"dlq_message_received".
| # Topic is derived from event class via BaseEvent.topic() | ||
| topic = type(ev).topic() | ||
| await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1) |
There was a problem hiding this comment.
Topic prefix inconsistency between publish and send_to_dlq.
EventPublisher.publish() internally derives the topic using the prefix: type(event).topic(self._prefix). However, send_to_dlq is called with an unprefixed topic from type(ev).topic(). This means original_topic in the DLQ will not match the actual published topic if a prefix is configured.
Consider using the same prefix when deriving the topic for DLQ:
Suggested fix
# Exercise send_to_dlq path — should not raise
# Topic is derived from event class via BaseEvent.topic()
- topic = type(ev).topic()
+ # Note: In production, EventPublisher uses settings.KAFKA_TOPIC_PREFIX
+ # For this test, we use unprefixed topic since we're just exercising the code path
+ topic = type(ev).topic() # Unprefixed for test simplicity
await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1)Alternatively, retrieve the prefix from settings to match production behavior.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Topic is derived from event class via BaseEvent.topic() | |
| topic = type(ev).topic() | |
| await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1) | |
| # Exercise send_to_dlq path — should not raise | |
| # Topic is derived from event class via BaseEvent.topic() | |
| # Note: In production, EventPublisher uses settings.KAFKA_TOPIC_PREFIX | |
| # For this test, we use unprefixed topic since we're just exercising the code path | |
| topic = type(ev).topic() # Unprefixed for test simplicity | |
| await prod.send_to_dlq(ev, original_topic=topic, error=RuntimeError("forced"), retry_count=1) |
🤖 Prompt for AI Agents
In `@backend/tests/e2e/events/test_producer_roundtrip.py` around lines 25 - 27,
The DLQ call uses an unprefixed topic (type(ev).topic()) while
EventPublisher.publish() uses a prefixed topic via
type(event).topic(self._prefix), causing mismatch when a prefix is configured;
update the test to derive the DLQ original_topic the same way as publish by
passing the configured prefix (e.g., call type(ev).topic(prod._prefix) or obtain
the prefix from settings) so the original_topic passed to prod.send_to_dlq(...)
matches the actual published topic; reference EventPublisher.publish(),
send_to_dlq, and type(...).topic(...) when making the change.
| class FailingProducer(FakeEventPublisher): | ||
| async def produce( | ||
| self, event_to_produce: DomainEvent, key: str | None = None, headers: dict[str, str] | None = None | ||
| ) -> None: | ||
| self, event: BaseEvent, key: str | None = None | ||
| ) -> str: | ||
| raise RuntimeError("Publish failed") |
There was a problem hiding this comment.
FailingProducer overrides the old method, so the failure path isn’t exercised.
PodMonitor calls publish, but the test override uses produce.
Suggested fix
- async def produce(
+ async def publish(
self, event: BaseEvent, key: str | None = None
) -> str:
raise RuntimeError("Publish failed")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class FailingProducer(FakeEventPublisher): | |
| async def produce( | |
| self, event_to_produce: DomainEvent, key: str | None = None, headers: dict[str, str] | None = None | |
| ) -> None: | |
| self, event: BaseEvent, key: str | None = None | |
| ) -> str: | |
| raise RuntimeError("Publish failed") | |
| class FailingProducer(FakeEventPublisher): | |
| async def publish( | |
| self, event: BaseEvent, key: str | None = None | |
| ) -> str: | |
| raise RuntimeError("Publish failed") |
🤖 Prompt for AI Agents
In `@backend/tests/unit/services/pod_monitor/test_monitor.py` around lines 370 -
374, The test's FailingProducer currently overrides produce so PodMonitor's
failure path isn't hit; change the override to implement async def publish(self,
event: BaseEvent, key: str | None = None) -> str in class FailingProducer (which
extends FakeEventPublisher) and raise RuntimeError("Publish failed") there so
PodMonitor.publish calls exercise the error path; keep the same signature and
return type to match FakeEventPublisher/PodMonitor expectations.
| from faststream.kafka import KafkaBroker | ||
|
|
||
|
|
||
| async def run_replay_service(settings: Settings) -> None: | ||
| """Run the event replay service with DI-managed cleanup scheduler.""" | ||
| tmp_logger = setup_logger(settings.LOG_LEVEL) | ||
| schema_registry = SchemaRegistryManager(settings, tmp_logger) | ||
| broker = create_broker(settings, schema_registry, tmp_logger) | ||
| broker = KafkaBroker(settings.KAFKA_BOOTSTRAP_SERVERS, logger=tmp_logger) | ||
|
|
||
| container = create_event_replay_container(settings, broker) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect event replay container wiring and broker lifecycle
echo "=== Searching for create_event_replay_container ==="
rg -n "create_event_replay_container" backend/app/core/container.py -A 20
echo -e "\n=== Searching for EventReplay references ==="
rg -n "EventReplay" backend/app/core/container.py -C 3
echo -e "\n=== Searching for KafkaBroker/broker handling ==="
rg -n "KafkaBroker|broker" backend/app/core/container.py -C 3
echo -e "\n=== Check if container.py exists and its size ==="
ls -lh backend/app/core/container.py 2>/dev/null || echo "File not found at that path"
echo -e "\n=== Search more broadly for container definitions ==="
find . -name "container.py" -type f 2>/dev/null | head -5Repository: HardMax71/Integr8sCode
Length of output: 6104
🏁 Script executed:
#!/bin/bash
# Inspect the full run_event_replay.py file
echo "=== Full run_event_replay.py file ==="
cat -n backend/workers/run_event_replay.py
echo -e "\n=== Check EventReplayWorkerProvider for lifecycle management ==="
rg -n "EventReplayWorkerProvider" backend/app/core/providers.py -A 30Repository: HardMax71/Integr8sCode
Length of output: 3779
🏁 Script executed:
#!/bin/bash
# Check if broker is closed anywhere in event replay worker
echo "=== Check for broker.close() in run_event_replay.py ==="
rg -n "broker\.|close()" backend/workers/run_event_replay.py
# Check how other workers handle broker cleanup
echo -e "\n=== Check other workers for broker cleanup patterns ==="
find backend/workers -name "*.py" -type f | head -5
for file in $(find backend/workers -name "run_*.py" -type f | head -3); do
echo -e "\n=== File: $file ==="
grep -n "broker\|close()" "$file" | head -10
done
# Check if dishka container.close() manages context objects
echo -e "\n=== Look for MessagingProvider to see how brokers are typically managed ==="
rg -n "MessagingProvider|KafkaBroker" backend/app/core/providers.py -A 15 | head -40Repository: HardMax71/Integr8sCode
Length of output: 3022
🏁 Script executed:
#!/bin/bash
# Check FastStream documentation and how it manages broker lifecycle
echo "=== Check if broker needs explicit close in event replay context ==="
rg -n "FastStream" backend/workers/run_event_replay.py
echo -e "\n=== Compare all worker shutdown patterns ==="
for file in backend/workers/run_*.py; do
echo -e "\n--- File: $file ---"
grep -n "finally\|await.*close\|FastStream" "$file"
done
# Check if dishka's container.close() handles context-injected objects
echo -e "\n=== Search for dishka/asynccontainer usage patterns ==="
rg -n "await container.close()" backend/workers/ -B 5 | head -40Repository: HardMax71/Integr8sCode
Length of output: 3627
🏁 Script executed:
#!/bin/bash
# Check if dishka's from_context manages lifecycle
echo "=== Check dishka documentation patterns for context objects ==="
rg -n "from_context\|AsyncIterator" backend/app/core/providers.py | head -20
# Verify if broker.close() is a valid method on KafkaBroker
echo -e "\n=== Check if there are any broker.close() calls elsewhere ==="
rg -n "broker\.close\|broker\.stop" backend/ -g "*.py"
# Look at how other services manage KafkaBroker cleanup
echo -e "\n=== Check if FastStream manages broker cleanup ==="
rg -n "class FastStream|def.*shutdown" backend/workers/ -A 3 | head -30Repository: HardMax71/Integr8sCode
Length of output: 1947
🏁 Script executed:
#!/bin/bash
# Check if setup_dishka is called in event replay worker
echo "=== Check setup_dishka usage in event replay ==="
rg -n "setup_dishka" backend/workers/run_event_replay.py
# Check what setup_dishka does
echo -e "\n=== Find setup_dishka definition ==="
rg -n "def setup_dishka|setup_dishka" backend/app/ -A 5
# Check if there's explicit broker cleanup in any provider
echo -e "\n=== Check for AsyncIterator providers that manage broker ==="
rg -n "AsyncIterator\[KafkaBroker\]" backend/app/core/providers.py -B 3 -A 10
# Look at how MessagingProvider receives broker
echo -e "\n=== Full MessagingProvider implementation ==="
rg -n "class MessagingProvider" backend/app/core/providers.py -A 25Repository: HardMax71/Integr8sCode
Length of output: 2209
🏁 Script executed:
#!/bin/bash
# Check FastStream/KafkaBroker documentation for lifecycle requirements
echo "=== Check if KafkaBroker has close method ==="
find . -name "*.py" -type f -exec grep -l "KafkaBroker" {} \; | head -5 | xargs grep -n "\.close\|\.stop" | grep -i kafka
# Check the actual setup_dishka_faststream to see if it manages broker
echo -e "\n=== Check if setup_dishka_faststream is used in event replay ==="
rg -n "setup_dishka" backend/workers/run_event_replay.py -B 5 -A 5
# Compare main app setup vs event replay setup
echo -e "\n=== Check main app setup pattern ==="
sed -n '70,90p' backend/app/main.py
# Check if dishka container.close() handles context objects
echo -e "\n=== Check if there's any cleanup code after container creation in event replay ==="
sed -n '13,40p' backend/workers/run_event_replay.pyRepository: HardMax71/Integr8sCode
Length of output: 2495
🏁 Script executed:
#!/bin/bash
# Check dishka's handling of context objects and whether container.close() manages them
echo "=== Check if dishka container closes context-injected objects ==="
rg -n "context=" backend/app/core/container.py | head -10
# Look for any explicit broker cleanup elsewhere
echo -e "\n=== Check if broker cleanup happens in EventReplayService or related classes ==="
rg -n "broker\.|KafkaBroker" backend/app/services/event_replay/ -g "*.py"
# Check the difference: setup_dishka_faststream usage
echo -e "\n=== See main.py full setup ==="
sed -n '1,90p' backend/app/main.py | grep -n "setup_dishka\|broker\|lifespan"
# Check if from_context creates AsyncIterator context managers
echo -e "\n=== Check if MessagingProvider or any other provider manages broker lifecycle ==="
rg -n "class.*Provider.*:" backend/app/core/providers.py | head -20Repository: HardMax71/Integr8sCode
Length of output: 2382
🏁 Script executed:
#!/bin/bash
# Check the lifespan function to see how broker is managed in main app
echo "=== Check dishka_lifespan.py ==="
cat backend/app/core/dishka_lifespan.py
# Verify that dishka's from_context doesn't manage lifecycle
echo -e "\n=== Check dishka documentation patterns in codebase for managed objects ==="
rg -n "AsyncIterator" backend/app/core/providers.py -B 2 -A 5 | head -30Repository: HardMax71/Integr8sCode
Length of output: 4274
Add explicit Kafka broker lifecycle management.
The broker is created and passed to the container via from_context, but it is never started or stopped. Unlike the main application (which calls await broker.start() and await broker.stop() in the lifespan handler), the event replay worker lacks these essential lifecycle calls. The container.close() call only manages objects created by providers, not context-injected objects.
Wrap broker initialization in an AsyncIterator provider or add explicit calls:
await broker.start()
# ... service runs ...
await broker.stop()Alternatively, refactor to use a managed provider pattern similar to Redis and Database clients in the providers module.
🤖 Prompt for AI Agents
In `@backend/workers/run_event_replay.py` around lines 10 - 18, The KafkaBroker
created in run_replay_service is never started or stopped; update
run_replay_service to explicitly manage the broker lifecycle by calling await
broker.start() before using create_event_replay_container(...) and ensuring
await broker.stop() runs in a finally block (or convert the broker to an
AsyncIterator provider like the Redis/Database providers), so the
context-injected KafkaBroker is started for the service and reliably stopped
when container.close() completes.




Summary by cubic
Switched event transport from Avro/Schema Registry to FastStream’s native Pydantic JSON with 1 topic per event class, removed the EventStore/KafkaEventService, and replaced event_type with topic across API and DB. UnifiedProducer is now EventPublisher that persists idempotently; DLQ processes typed events and metadata via headers. CI pre-pulls the Python runtime image for reliable startup.
Performance
Migration
Written for commit 20288b5. Summary will update on new commits.
Summary by CodeRabbit
Refactor
New Features
Bug Fixes
Tests
Chores