Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,9 @@ def __init__(
]

self._dlq_events_topic = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DLQ_EVENTS}"
self._event_metadata = EventMetadata(service_name="dlq-manager", service_version="1.0.0")

def _filter_test_events(self, message: DLQMessage) -> bool:
event_id = message.event.event_id or ""
return not event_id.startswith("test-")
return not message.event.event_id.startswith("test-")

def _filter_old_messages(self, message: DLQMessage) -> bool:
max_age_days = 7
Expand Down Expand Up @@ -99,7 +97,11 @@ async def handle_message(self, message: DLQMessage) -> None:
retry_count=message.retry_count,
producer_id=message.producer_id,
failed_at=message.failed_at,
metadata=self._event_metadata,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
),
),
topic=self._dlq_events_topic,
)
Expand All @@ -126,17 +128,14 @@ async def retry_message(self, message: DLQMessage) -> None:
"""
hdrs: dict[str, str] = {
"event_type": message.event.event_type,
"dlq_retry_count": str(message.retry_count + 1),
"dlq_original_error": message.error,
"dlq_retry_timestamp": datetime.now(timezone.utc).isoformat(),
}
hdrs = inject_trace_context(hdrs)

# Publish directly to original topic - FastStream serializes Pydantic to JSON
await self._broker.publish(
message=message.event,
topic=message.original_topic,
key=message.event.event_id.encode() if message.event.event_id else None,
key=message.event.event_id.encode(),
headers=hdrs,
)

Expand All @@ -159,7 +158,11 @@ async def retry_message(self, message: DLQMessage) -> None:
original_event_type=str(message.event.event_type),
retry_count=new_retry_count,
retry_topic=message.original_topic,
metadata=self._event_metadata,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
),
),
topic=self._dlq_events_topic,
)
Expand All @@ -185,7 +188,11 @@ async def discard_message(self, message: DLQMessage, reason: str) -> None:
original_event_type=str(message.event.event_type),
reason=reason,
retry_count=message.retry_count,
metadata=self._event_metadata,
metadata=EventMetadata(
service_name="dlq-manager",
service_version="1.0.0",
correlation_id=message.event.metadata.correlation_id,
),
),
topic=self._dlq_events_topic,
)
Expand Down
4 changes: 1 addition & 3 deletions backend/app/events/core/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ async def produce(self, event_to_produce: DomainEvent, key: str) -> None:
try:
headers = inject_trace_context({
"event_type": event_to_produce.event_type,
"correlation_id": event_to_produce.metadata.correlation_id or "",
"service": event_to_produce.metadata.service_name,
})

# FastStream handles Pydantic → JSON serialization natively
Expand Down Expand Up @@ -89,7 +87,7 @@ async def send_to_dlq(
await self._broker.publish(
message=original_event,
topic=dlq_topic,
key=original_event.event_id.encode() if original_event.event_id else None,
key=original_event.event_id.encode(),
headers=headers,
)

Expand Down
111 changes: 83 additions & 28 deletions backend/app/events/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dishka.integrations.faststream import FromDishka
from faststream import AckPolicy, StreamMessage
from faststream.kafka import KafkaBroker
from faststream.kafka import KafkaBroker, KafkaMessage
from opentelemetry.trace import SpanKind

from app.core.tracing import EventAttributes
Expand Down Expand Up @@ -36,6 +36,32 @@
from app.settings import Settings


def _extract_headers(msg: StreamMessage[Any]) -> dict[str, str]:
"""Decode raw Kafka headers into a string dict for OTel extraction."""
return {k: v.decode() if isinstance(v, bytes) else v for k, v in (msg.raw_message.headers or [])}


async def _with_trace(
msg: StreamMessage[Any],
span_name: str,
body: DomainEvent,
handler: Callable[[], Awaitable[None]],
) -> None:
"""Run handler inside an OTel consumer span linked to the producer's trace context."""
headers = _extract_headers(msg)
ctx = extract_trace_context(headers)
with get_tracer().start_as_current_span(
name=span_name,
context=ctx,
kind=SpanKind.CONSUMER,
attributes={
EventAttributes.EVENT_TYPE: body.event_type,
EventAttributes.EVENT_ID: body.event_id,
},
):
await handler()


async def with_idempotency(
event: DomainEvent,
handler: Callable[..., Awaitable[None]],
Expand Down Expand Up @@ -78,46 +104,50 @@ def register_coordinator_subscriber(broker: KafkaBroker, settings: Settings) ->
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_REQUESTED)
async def on_execution_requested(
body: ExecutionRequestedEvent,
msg: KafkaMessage,
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_requested", body, lambda: with_idempotency(
body, coordinator.handle_execution_requested, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: KafkaMessage,
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_completed", body, lambda: with_idempotency(
body, coordinator.handle_execution_completed, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: KafkaMessage,
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_failed", body, lambda: with_idempotency(
body, coordinator.handle_execution_failed, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_CANCELLED)
async def on_execution_cancelled(
body: ExecutionCancelledEvent,
msg: KafkaMessage,
coordinator: FromDishka[ExecutionCoordinator],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(
await _with_trace(msg, "coordinator.execution_cancelled", body, lambda: with_idempotency(
body, coordinator.handle_execution_cancelled, idem, KeyStrategy.EVENT_BASED, 7200, logger,
)
))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -134,20 +164,26 @@ def register_k8s_worker_subscriber(broker: KafkaBroker, settings: Settings) -> N
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.CREATE_POD_COMMAND)
async def on_create_pod(
body: CreatePodCommandEvent,
msg: KafkaMessage,
worker: FromDishka[KubernetesWorker],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger)
await _with_trace(msg, "k8s_worker.create_pod", body, lambda: with_idempotency(
body, worker.handle_create_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger,
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.DELETE_POD_COMMAND)
async def on_delete_pod(
body: DeletePodCommandEvent,
msg: KafkaMessage,
worker: FromDishka[KubernetesWorker],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger)
await _with_trace(msg, "k8s_worker.delete_pod", body, lambda: with_idempotency(
body, worker.handle_delete_pod_command, idem, KeyStrategy.CONTENT_HASH, 3600, logger,
))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -166,29 +202,38 @@ def register_result_processor_subscriber(broker: KafkaBroker, settings: Settings
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: KafkaMessage,
processor: FromDishka[ResultProcessor],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger)
await _with_trace(msg, "result_processor.execution_completed", body, lambda: with_idempotency(
body, processor.handle_execution_completed, idem, KeyStrategy.CONTENT_HASH, 7200, logger,
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: KafkaMessage,
processor: FromDishka[ResultProcessor],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger)
await _with_trace(msg, "result_processor.execution_failed", body, lambda: with_idempotency(
body, processor.handle_execution_failed, idem, KeyStrategy.CONTENT_HASH, 7200, logger,
))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_TIMEOUT)
async def on_execution_timeout(
body: ExecutionTimeoutEvent,
msg: KafkaMessage,
processor: FromDishka[ResultProcessor],
idem: FromDishka[IdempotencyManager],
logger: FromDishka[logging.Logger],
) -> None:
await with_idempotency(body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger)
await _with_trace(msg, "result_processor.execution_timeout", body, lambda: with_idempotency(
body, processor.handle_execution_timeout, idem, KeyStrategy.CONTENT_HASH, 7200, logger,
))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -205,30 +250,34 @@ def register_saga_subscriber(broker: KafkaBroker, settings: Settings) -> None:
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_REQUESTED)
async def on_execution_requested(
body: ExecutionRequestedEvent,
msg: KafkaMessage,
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_requested(body)
await _with_trace(msg, "saga.execution_requested", body, lambda: orchestrator.handle_execution_requested(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: KafkaMessage,
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_completed(body)
await _with_trace(msg, "saga.execution_completed", body, lambda: orchestrator.handle_execution_completed(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: KafkaMessage,
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_failed(body)
await _with_trace(msg, "saga.execution_failed", body, lambda: orchestrator.handle_execution_failed(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_TIMEOUT)
async def on_execution_timeout(
body: ExecutionTimeoutEvent,
msg: KafkaMessage,
orchestrator: FromDishka[SagaOrchestrator],
) -> None:
await orchestrator.handle_execution_timeout(body)
await _with_trace(msg, "saga.execution_timeout", body, lambda: orchestrator.handle_execution_timeout(body))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand Down Expand Up @@ -264,23 +313,28 @@ def register_notification_subscriber(broker: KafkaBroker, settings: Settings) ->
@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_COMPLETED)
async def on_execution_completed(
body: ExecutionCompletedEvent,
msg: KafkaMessage,
service: FromDishka[NotificationService],
) -> None:
await service.handle_execution_completed(body)
await _with_trace(
msg, "notification.execution_completed", body, lambda: service.handle_execution_completed(body),
)

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_FAILED)
async def on_execution_failed(
body: ExecutionFailedEvent,
msg: KafkaMessage,
service: FromDishka[NotificationService],
) -> None:
await service.handle_execution_failed(body)
await _with_trace(msg, "notification.execution_failed", body, lambda: service.handle_execution_failed(body))

@sub(filter=lambda msg: msg.headers["event_type"] == EventType.EXECUTION_TIMEOUT)
async def on_execution_timeout(
body: ExecutionTimeoutEvent,
msg: KafkaMessage,
service: FromDishka[NotificationService],
) -> None:
await service.handle_execution_timeout(body)
await _with_trace(msg, "notification.execution_timeout", body, lambda: service.handle_execution_timeout(body))

@sub
async def on_unhandled(body: DomainEvent) -> None:
Expand All @@ -303,22 +357,23 @@ def register_dlq_subscriber(broker: KafkaBroker, settings: Settings) -> None:
)
async def on_dlq_message(
body: DomainEvent,
msg: StreamMessage[Any],
msg: KafkaMessage,
manager: FromDishka[DLQManager],
logger: FromDishka[logging.Logger],
) -> None:
start = asyncio.get_running_loop().time()
headers = _extract_headers(msg)
raw = msg.raw_message
headers = {k: v.decode() for k, v in (raw.headers or [])}
assert not isinstance(raw, tuple) # single-message consumer, never batch

dlq_msg = DLQMessage(
event=body,
original_topic=headers.get("original_topic", ""),
error=headers.get("error", "Unknown error"),
retry_count=int(headers.get("retry_count", "0")),
original_topic=headers["original_topic"],
error=headers["error"],
retry_count=int(headers["retry_count"]),
failed_at=datetime.fromisoformat(headers["failed_at"]),
status=DLQMessageStatus(headers.get("status", "pending")),
producer_id=headers.get("producer_id", "unknown"),
status=DLQMessageStatus(headers["status"]),
producer_id=headers["producer_id"],
dlq_offset=raw.offset,
dlq_partition=raw.partition,
headers=headers,
Expand Down
3 changes: 0 additions & 3 deletions backend/app/services/pod_monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ async def _process_pod_event(self, event: PodEvent) -> None:
async def _publish_event(self, event: DomainEvent, pod: k8s_client.V1Pod) -> None:
"""Publish event to Kafka and store in events collection."""
try:
if pod.metadata and pod.metadata.labels:
event.metadata.correlation_id = pod.metadata.labels.get("execution-id") or ""

execution_id = getattr(event, "execution_id", None) or event.aggregate_id
key = str(execution_id or (pod.metadata.name if pod.metadata else "unknown"))

Expand Down
Loading
Loading