Skip to content

Fix/passing msgs in full no hdrs#142

Merged
HardMax71 merged 3 commits intomainfrom
fix/passing-msgs-in-full-no-hdrs
Feb 6, 2026
Merged

Fix/passing msgs in full no hdrs#142
HardMax71 merged 3 commits intomainfrom
fix/passing-msgs-in-full-no-hdrs

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 6, 2026


Summary by cubic

Stop relying on Kafka headers for business data and pass correlation_id through EventMetadata end to end, with new consumer OTel spans for accurate trace propagation. Also updates cert setup (CA filename change) and bumps a few dependencies.

  • Bug Fixes

    • Add _with_trace wrapper to consumers to create CONSUMER spans, extract W3C trace headers, and tag event_type/event_id.
    • Producer stops adding custom headers (correlation_id, service); only trace context is injected.
    • DLQ manager emits DLQ events with correlation_id in EventMetadata and no retry headers; uses raw offset/partition.
    • Result processor, saga orchestrator, and execution saga propagate correlation_id in all emitted events; pod monitor no longer overrides it from k8s labels.
    • Notification and worker/coordinator handlers run inside tracing spans for consistent linkage.
  • Migration

    • Dev/proxy CA path changed to /shared_ca/ca.pem. Rebuild and restart cert-generator and frontend: docker compose build cert-generator && docker compose up -d cert-generator frontend.

Written for commit 01b1c5c. Summary will update on new commits.

Summary by CodeRabbit

  • Chores
    • Updated Python and frontend dependencies for stability and security.
    • Bumped Kubernetes tooling and simplified local certificate setup; development CA path updated.
  • Bug Fixes / Reliability
    • Improved message retry and dead-letter handling for more consistent delivery keys and metadata propagation.
  • Observability
    • Expanded tracing and correlation across backend services for better end-to-end request visibility.

@coderabbitai
Copy link

coderabbitai bot commented Feb 6, 2026

📝 Walkthrough

Walkthrough

Refactors correlation_id propagation across services, adds OpenTelemetry consumer-span tracing to event handlers, changes DLQ and producer header/key behavior to use per-message EventMetadata, and updates infrastructure: kubectl bump, certificate filename change (mkcert-ca.pem → ca.pem), and dependency version bumps.

Changes

Cohort / File(s) Summary
DLQ & Producer
backend/app/dlq/manager.py, backend/app/events/core/producer.py
DLQ manager now constructs per-message EventMetadata using message correlation_id; removed shared _event_metadata. Producer headers no longer include correlation_id or service_name; DLQ keys always set to event_id.encode().
Event Handler Tracing
backend/app/events/handlers.py
Adds _extract_headers and _with_trace to extract headers and create OpenTelemetry consumer spans; wraps many subscriber handlers to preserve idempotency while propagating trace context and event attributes.
Correlation ID Propagation
backend/app/services/result_processor/processor.py, backend/app/services/saga/execution_saga.py, backend/app/services/saga/saga_orchestrator.py
Propagates correlation_id through result publishing, saga commands, compensation flows, and stores correlation_id in SagaContext and published event metadata. Method signatures updated for result publishing.
Pod Monitor
backend/app/services/pod_monitor/monitor.py
Removed setting event.metadata.correlation_id from pod label extraction in _publish_event; execution_id/key fallback unchanged.
Certificates & Dev Config
cert-generator/Dockerfile, cert-generator/setup-certs.sh, docker-compose.yaml, frontend/rollup.config.js, docs/architecture/frontend-build.md
Bumped KUBECTL_VERSION to v1.35.0; removed mkcert setup; renamed shared CA file from mkcert-ca.pem to ca.pem across scripts, compose, config, and docs; added setup scripts into Dockerfile.
Dependency Updates
backend/pyproject.toml, frontend/package.json
Bumped multiple Python and JS dependencies (click, multidict, propcache, sse-starlette, iniconfig, Codemirror, @lucide/svelte, @hey-api/openapi-ts, @typescript-eslint/parser).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Producer
    participant Kafka
    participant Consumer
    participant Handler
    participant DLQ
    Producer->>Kafka: produce(event, headers with event_type)
    Kafka->>Consumer: deliver(message, headers)
    Consumer->>Handler: handle_message(msg) with _with_trace (extract headers, create span)
    alt handler succeeds
        Handler->>Producer: produce(result_event)  -- includes per-message EventMetadata.correlation_id
    else handler fails
        Handler->>DLQ: send_to_dlq(msg, headers including event_type, key=event_id)
        DLQ->>Kafka: publish(dlq-message)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐰 I hopped through headers, traced each thread,

correlation_id tucked snug in bed.
Spans hum softly, events bound tight,
Certificates renamed — all set for flight. ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 34.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is vague and uses cryptic abbreviations that do not clearly convey the main changes; 'Fix/passing msgs in full no hdrs' lacks specificity about what is being fixed or changed. Consider a more descriptive title that clearly explains the primary change, such as 'Propagate correlation_id through EventMetadata instead of Kafka headers' or 'Add distributed tracing with correlation_id propagation'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/passing-msgs-in-full-no-hdrs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@frontend/package.json`:
- Line 62: The dependency entry for "@hey-api/openapi-ts" in package.json
changed from an exact pin to a caret range which can cause generator output
diffs; revert it to an exact version to preserve deterministic codegen (e.g.,
change "@hey-api/openapi-ts": "^0.90.10" back to the exact version you intend to
lock, such as "0.90.4" or "0.90.10"), update package.json accordingly, and run a
lockfile update (npm/yarn/pnpm) to ensure the locked dependency matches the
exact version.
🧹 Nitpick comments (5)
frontend/package.json (1)

72-73: @typescript-eslint/parser and @typescript-eslint/eslint-plugin version ranges are now slightly misaligned.

parser is bumped to ^8.53.1 while eslint-plugin remains at ^8.53.0. The typescript-eslint project recommends keeping these in lockstep. Consider bumping the plugin to ^8.53.1 as well.

Proposed fix
-    "@typescript-eslint/eslint-plugin": "^8.53.0",
+    "@typescript-eslint/eslint-plugin": "^8.53.1",
     "@typescript-eslint/parser": "^8.53.1",
backend/app/services/saga/execution_saga.py (1)

170-176: Compensation correctly retrieves correlation_id from context.

The fallback to "" on Line 175 would override EventMetadata's default_factory (which generates a UUID) with an empty string if the key is missing. In practice this path is guarded by the pod_creation_triggered check on Line 162, so it should always be populated. Still, falling back to a generated UUID via None sentinel + explicit handling (or omitting the kwarg) would be slightly safer.

backend/app/services/saga/saga_orchestrator.py (1)

314-321: Empty-string fallback for correlation_id suppresses the UUID default.

Same pattern as in execution_saga.py: if context_data lacks a "correlation_id" key (e.g., sagas persisted before this change is deployed), the fallback "" overrides EventMetadata's default_factory that would otherwise generate a valid UUID. Consider using None as the sentinel and only passing correlation_id when it's truthy, or letting the default factory handle it:

♻️ Suggested approach
-            correlation_id = saga_instance.context_data.get("correlation_id", "") if saga_instance.context_data else ""
+            correlation_id = (saga_instance.context_data.get("correlation_id") or "") if saga_instance.context_data else ""

Even better, omit the kwarg entirely when empty so the default factory kicks in:

correlation_id = saga_instance.context_data.get("correlation_id") if saga_instance.context_data else None
metadata_kwargs = {
    "service_name": "saga-orchestrator",
    "service_version": "1.0.0",
    "user_id": cancelled_by or "system",
}
if correlation_id:
    metadata_kwargs["correlation_id"] = correlation_id
metadata = EventMetadata(**metadata_kwargs)
backend/app/events/handlers.py (2)

39-41: _extract_headers: potential AttributeError if a header value is None.

If a Kafka header has a None value (permitted by the protocol), the isinstance(v, bytes) check passes through None as-is, which is fine for the dict. However, downstream callers like extract_trace_context or .get()-based lookups expect str values. Consider guarding:

♻️ Defensive tweak
 def _extract_headers(msg: StreamMessage[Any]) -> dict[str, str]:
     """Decode raw Kafka headers into a string dict for OTel extraction."""
-    return {k: v.decode() if isinstance(v, bytes) else v for k, v in (msg.raw_message.headers or [])}
+    return {
+        k: v.decode() if isinstance(v, bytes) else (v or "")
+        for k, v in (msg.raw_message.headers or [])
+    }

364-398: DLQ subscriber: headers["failed_at"] (Line 372) will KeyError if the header is missing.

Other header fields use .get() with defaults, but failed_at is accessed with direct indexing. If a DLQ message arrives without this header (e.g., from a legacy producer or a bug), the subscriber will crash and the message won't be ACKed.

If this is intentionally required (i.e., a missing failed_at is a programming error that should surface loudly), this is acceptable. Otherwise, consider a default:

♻️ Optional: safe fallback
-            failed_at=datetime.fromisoformat(headers["failed_at"]),
+            failed_at=datetime.fromisoformat(headers.get("failed_at", datetime.now(timezone.utc).isoformat())),

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 16 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/saga/saga_orchestrator.py">

<violation number="1" location="backend/app/services/saga/saga_orchestrator.py:316">
P2: Avoid overriding EventMetadata’s default correlation_id with an empty string; it can publish events with a blank correlation_id and break trace correlation. Generate a fallback ID (or only pass when present).</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 6, 2026

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 4 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/handlers.py">

<violation number="1" location="backend/app/events/handlers.py:371">
P2: Directly indexing DLQ headers can raise KeyError and crash the consumer when a DLQ message lacks one of these headers (e.g., older messages). Restore safe defaults or guard missing headers to keep DLQ processing resilient.</violation>
</file>

<file name="backend/app/services/saga/saga_orchestrator.py">

<violation number="1" location="backend/app/services/saga/saga_orchestrator.py:315">
P2: Guard against saga_instance.context_data being None before calling .get; otherwise saga cancellation can fail with AttributeError for sagas without context_data.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/events/handlers.py (1)

369-380: ⚠️ Potential issue | 🟠 Major

DLQ header access uses []KeyError if any expected header is missing.

Lines 371–376 access headers["original_topic"], headers["error"], etc. with direct dict subscript. If a message reaches the DLQ with a missing header (e.g., from a different producer version, or header corruption), this will raise KeyError and the message will be NACKed/retried endlessly (or poison the consumer).

Since this is the DLQ — the place broken messages land — defensive access is especially important here.

Proposed fix: validate required headers up front
         headers = _extract_headers(msg)
         raw = msg.raw_message
-        assert not isinstance(raw, tuple)  # single-message consumer, never batch
+        if isinstance(raw, tuple):
+            raise TypeError("Expected a single Kafka message, got a batch")
+
+        required = ("original_topic", "error", "retry_count", "failed_at", "status", "producer_id")
+        missing = [h for h in required if h not in headers]
+        if missing:
+            logger.error("DLQ message missing required headers", extra={"missing": missing})
+            return  # or send to a "poison pill" topic
 
         dlq_msg = DLQMessage(
             event=body,
🤖 Fix all issues with AI agents
In `@backend/app/events/handlers.py`:
- Around line 39-41: The _extract_headers function currently can include None
values in its returned dict, violating the annotated dict[str, str] and causing
downstream failures (e.g., DLQ subscriber / DLQMessage parsing that expects
strings). Update _extract_headers to either (a) filter out headers with None
values so they are not included in the returned dict, or (b) coerce None to a
safe string (e.g., "") before returning; implement this by changing the dict
comprehension in _extract_headers to skip entries where v is None or to convert
None -> "" and ensure the return type is preserved as dict[str, str].
🧹 Nitpick comments (3)
backend/app/dlq/manager.py (1)

100-104: Extract repeated EventMetadata construction to reduce duplication.

The same service_name="dlq-manager", service_version="1.0.0" triple appears in handle_message, retry_message, and discard_message. A small helper (or class-level constants) would keep this DRY and make version bumps a single-line change.

♻️ Proposed refactor

Add constants or a helper method on DLQManager:

 class DLQManager:
+    SERVICE_NAME = "dlq-manager"
+    SERVICE_VERSION = "1.0.0"
+
+    def _make_metadata(self, correlation_id: str) -> EventMetadata:
+        return EventMetadata(
+            service_name=self.SERVICE_NAME,
+            service_version=self.SERVICE_VERSION,
+            correlation_id=correlation_id,
+        )

Then replace each inline construction, e.g.:

-                metadata=EventMetadata(
-                    service_name="dlq-manager",
-                    service_version="1.0.0",
-                    correlation_id=message.event.metadata.correlation_id,
-                ),
+                metadata=self._make_metadata(message.event.metadata.correlation_id),

Also applies to: 161-165, 191-195

backend/app/events/handlers.py (2)

367-367: assert in production code can be stripped by python -O.

If the process runs with the -O flag, this assert is silently removed. Consider replacing with an explicit check and raise.

Proposed fix
-        assert not isinstance(raw, tuple)  # single-message consumer, never batch
+        if isinstance(raw, tuple):
+            raise TypeError("Expected a single Kafka message, got a batch")

395-401: Metrics are only recorded on successful processing — silent loss on exceptions.

If manager.handle_message(dlq_msg) raises, lines 395–401 are skipped. This means failed DLQ processing won't be counted in record_dlq_message_received or record_dlq_processing_duration. Consider wrapping in try/finally so at least the duration and receipt are always recorded.

Proposed fix
+        try:
             with get_tracer().start_as_current_span(
                 name="dlq.consume",
                 context=ctx,
                 kind=SpanKind.CONSUMER,
                 attributes={
                     EventAttributes.KAFKA_TOPIC: str(manager.dlq_topic),
                     EventAttributes.EVENT_TYPE: body.event_type,
                     EventAttributes.EVENT_ID: body.event_id,
                 },
             ):
                 await manager.handle_message(dlq_msg)
+        finally:
+            manager.metrics.record_dlq_message_received(dlq_msg.original_topic, body.event_type)
+            manager.metrics.record_dlq_message_age(
+                (datetime.now(timezone.utc) - dlq_msg.failed_at).total_seconds()
+            )
+            manager.metrics.record_dlq_processing_duration(
+                asyncio.get_running_loop().time() - start, "process"
+            )
-
-        manager.metrics.record_dlq_message_received(dlq_msg.original_topic, body.event_type)
-        manager.metrics.record_dlq_message_age(
-            (datetime.now(timezone.utc) - dlq_msg.failed_at).total_seconds()
-        )
-        manager.metrics.record_dlq_processing_duration(
-            asyncio.get_running_loop().time() - start, "process"
-        )

@HardMax71 HardMax71 merged commit f8ab845 into main Feb 6, 2026
18 checks passed
@HardMax71 HardMax71 deleted the fix/passing-msgs-in-full-no-hdrs branch February 6, 2026 20:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant