From 5f7120a6e6b91de4ba477024c46f67824783c2b5 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Sat, 7 Feb 2026 13:36:36 +0100 Subject: [PATCH 1/3] fix: removed asyncio.sleep --- backend/app/api/routes/events.py | 7 +- backend/app/core/adaptive_sampling.py | 105 ++---- .../services/event_replay/replay_service.py | 330 ++++++++++-------- backend/app/services/k8s_worker/worker.py | 14 - backend/app/services/notification_service.py | 102 +++--- .../tests/unit/core/test_adaptive_sampling.py | 101 +++--- 6 files changed, 302 insertions(+), 357 deletions(-) diff --git a/backend/app/api/routes/events.py b/backend/app/api/routes/events.py index 4dbf3ca5..bcd7f9e2 100644 --- a/backend/app/api/routes/events.py +++ b/backend/app/api/routes/events.py @@ -1,4 +1,3 @@ -import asyncio import logging from datetime import datetime, timedelta, timezone from typing import Annotated, Any @@ -340,11 +339,7 @@ async def replay_aggregate_events( replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}" replayed_count = 0 - for i, event in enumerate(replay_info.events): - # Rate limiting: pause every 100 events to prevent overwhelming the system - if i > 0 and i % 100 == 0: - await asyncio.sleep(0.1) - + for event in replay_info.events: try: meta = EventMetadata( service_name=settings.SERVICE_NAME, diff --git a/backend/app/core/adaptive_sampling.py b/backend/app/core/adaptive_sampling.py index 90acabed..a3785c2b 100644 --- a/backend/app/core/adaptive_sampling.py +++ b/backend/app/core/adaptive_sampling.py @@ -1,5 +1,4 @@ import logging -import threading import time from collections import deque from collections.abc import Sequence @@ -18,6 +17,9 @@ class AdaptiveSampler(Sampler): - Error rate - Request rate - Resource utilization + + Rate adjustment is lazy: it runs inline during should_sample() + when the adjustment interval has elapsed. """ def __init__( @@ -47,25 +49,12 @@ def __init__( self.high_traffic_threshold = high_traffic_threshold self.adjustment_interval = adjustment_interval - # Current sampling rate self._current_rate = base_rate - - # Metrics tracking - self._request_count = 0 - self._error_count = 0 self._last_adjustment = time.time() - # Sliding window for rate calculation - self._request_window: deque[float] = deque(maxlen=60) # 1 minute window - self._error_window: deque[float] = deque(maxlen=60) # 1 minute window - - # Thread safety - self._lock = threading.Lock() - - # Start background adjustment thread - self._running = True - self._adjustment_thread = threading.Thread(target=self._adjustment_loop, daemon=True) - self._adjustment_thread.start() + # Sliding window for rate calculation (1 minute window) + self._request_window: deque[float] = deque(maxlen=60) + self._error_window: deque[float] = deque(maxlen=60) logging.getLogger("integr8scode").info(f"Adaptive sampler initialized with base rate: {base_rate}") @@ -80,7 +69,6 @@ def should_sample( trace_state: TraceState | None = None, ) -> SamplingResult: """Determine if a span should be sampled""" - # Get parent trace state parent_span_context = get_current_span(parent_context).get_span_context() parent_trace_state = None @@ -96,11 +84,11 @@ def should_sample( return SamplingResult(decision=Decision.RECORD_AND_SAMPLE, attributes=attributes) # Track request - self._track_request() + self._request_window.append(time.time()) # Always sample errors if self._is_error(attributes): - self._track_error() + self._error_window.append(time.time()) if parent_trace_state is not None: return SamplingResult( decision=Decision.RECORD_AND_SAMPLE, attributes=attributes, trace_state=parent_trace_state @@ -108,11 +96,15 @@ def should_sample( else: return SamplingResult(decision=Decision.RECORD_AND_SAMPLE, attributes=attributes) - # Apply current sampling rate using integer arithmetic to avoid precision issues - # Use trace ID for deterministic sampling - max_trace_id = (1 << 64) - 1 # 0xffffffffffffffff + # Lazy adjustment: re-evaluate rate when interval has elapsed + now = time.time() + if now - self._last_adjustment >= self.adjustment_interval: + self._last_adjustment = now + self._adjust_sampling_rate() + + # Apply current sampling rate using trace ID for deterministic sampling + max_trace_id = (1 << 64) - 1 masked_trace_id = trace_id & max_trace_id - # Compute threshold as integer, capping at max_trace_id if rate is 1.0 threshold = int(self._current_rate * max_trace_id) if self._current_rate >= 1.0: threshold = max_trace_id @@ -134,28 +126,14 @@ def get_description(self) -> str: """Return sampler description""" return f"AdaptiveSampler(current_rate={self._current_rate:.2%})" - def _track_request(self) -> None: - """Track a request""" - with self._lock: - self._request_count += 1 - self._request_window.append(time.time()) - - def _track_error(self) -> None: - """Track an error""" - with self._lock: - self._error_count += 1 - self._error_window.append(time.time()) - def _is_error(self, attributes: Attributes | None) -> bool: """Check if span attributes indicate an error""" if not attributes: return False - # Check for error status if attributes.get("error", False): return True - # Check HTTP status code status_code = attributes.get("http.status_code") if status_code and isinstance(status_code, (int, float)): if int(status_code) >= 500: @@ -164,7 +142,6 @@ def _is_error(self, attributes: Attributes | None) -> bool: if int(status_code) >= 500: return True - # Check for exception if attributes.get("exception.type"): return True @@ -172,18 +149,15 @@ def _is_error(self, attributes: Attributes | None) -> bool: def _calculate_metrics(self) -> tuple[float, int]: """Calculate current error rate and request rate""" - now = time.time() - minute_ago = now - 60 + minute_ago = time.time() - 60 - with self._lock: - # Clean old entries - while self._request_window and self._request_window[0] < minute_ago: - self._request_window.popleft() - while self._error_window and self._error_window[0] < minute_ago: - self._error_window.popleft() + while self._request_window and self._request_window[0] < minute_ago: + self._request_window.popleft() + while self._error_window and self._error_window[0] < minute_ago: + self._error_window.popleft() - request_rate = len(self._request_window) - error_rate = len(self._error_window) / max(1, len(self._request_window)) + request_rate = len(self._request_window) + error_rate = len(self._error_window) / max(1, len(self._request_window)) return error_rate, request_rate @@ -193,28 +167,21 @@ def _adjust_sampling_rate(self) -> None: new_rate = self.base_rate - # Increase sampling during high error rates if error_rate > self.error_rate_threshold: - # Scale up based on error rate error_multiplier: float = min(10.0, 1 + (error_rate / self.error_rate_threshold)) new_rate = min(self.max_rate, self.base_rate * error_multiplier) logging.getLogger("integr8scode").warning( f"High error rate detected ({error_rate:.1%}), increasing sampling to {new_rate:.1%}" ) - - # Decrease sampling during high traffic elif request_rate > self.high_traffic_threshold: - # Scale down based on traffic traffic_divisor = request_rate / self.high_traffic_threshold new_rate = max(self.min_rate, self.base_rate / traffic_divisor) logging.getLogger("integr8scode").info( f"High traffic detected ({request_rate} req/min), decreasing sampling to {new_rate:.1%}" ) - # Apply gradual changes if new_rate != self._current_rate: - # Smooth transitions - change_rate = 0.5 # Adjust 50% towards target + change_rate = 0.5 self._current_rate = self._current_rate + (new_rate - self._current_rate) * change_rate logging.getLogger("integr8scode").info( @@ -222,30 +189,14 @@ def _adjust_sampling_rate(self) -> None: f"(error_rate: {error_rate:.1%}, request_rate: {request_rate} req/min)" ) - def _adjustment_loop(self) -> None: - """Background thread for periodic rate adjustment""" - while self._running: - time.sleep(self.adjustment_interval) - - try: - self._adjust_sampling_rate() - except Exception as e: - logging.getLogger("integr8scode").error(f"Error adjusting sampling rate: {e}") - - def shutdown(self) -> None: - """Shutdown the sampler""" - self._running = False - if self._adjustment_thread.is_alive(): - self._adjustment_thread.join(timeout=5.0) - def create_adaptive_sampler(settings: Settings) -> AdaptiveSampler: """Create adaptive sampler with settings""" return AdaptiveSampler( base_rate=settings.TRACING_SAMPLING_RATE, - min_rate=max(0.001, settings.TRACING_SAMPLING_RATE / 100), # 1/100th of base + min_rate=max(0.001, settings.TRACING_SAMPLING_RATE / 100), max_rate=1.0, - error_rate_threshold=0.05, # 5% error rate - high_traffic_threshold=1000, # 1000 requests per minute - adjustment_interval=60, # Adjust every minute + error_rate_threshold=0.05, + high_traffic_threshold=1000, + adjustment_interval=60, ) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index 0ff77b11..b73d1994 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -6,11 +6,11 @@ from uuid import uuid4 import aiofiles -from opentelemetry.trace import SpanKind +import backoff +from apscheduler.schedulers.asyncio import AsyncIOScheduler from pydantic import ValidationError from app.core.metrics import ReplayMetrics -from app.core.tracing.utils import trace_span from app.db.repositories.replay_repository import ReplayRepository from app.domain.admin.replay_updates import ReplaySessionUpdate from app.domain.enums.replay import ReplayStatus, ReplayTarget @@ -36,8 +36,10 @@ def __init__( logger: logging.Logger, ) -> None: self._sessions: dict[str, ReplaySessionState] = {} - self._active_tasks: dict[str, asyncio.Task[None]] = {} - self._resume_events: dict[str, asyncio.Event] = {} + self._schedulers: dict[str, AsyncIOScheduler] = {} + self._batch_iters: dict[str, AsyncIterator[list[DomainEvent]]] = {} + self._event_buffers: dict[str, list[DomainEvent]] = {} + self._buffer_indices: dict[str, int] = {} self._repository = repository self._producer = producer self.logger = logger @@ -63,16 +65,34 @@ async def start_session(self, session_id: str) -> ReplayOperationResult: if session.status != ReplayStatus.CREATED: raise ReplayOperationError(session_id, "start", "Session already started") - resume_event = asyncio.Event() - resume_event.set() - self._resume_events[session_id] = resume_event + total_count = await self._repository.count_events(session.config.filter) + session.total_events = min(total_count, session.config.max_events or total_count) - task = asyncio.create_task(self._run_replay(session)) - self._active_tasks[session_id] = task + batch_iter = self._fetch_event_batches(session) + self._batch_iters[session_id] = batch_iter + + if not await self._load_next_batch(session_id): + session.status = ReplayStatus.COMPLETED + session.completed_at = datetime.now(timezone.utc) + await self._update_session_in_db(session) + return ReplayOperationResult( + session_id=session_id, status=ReplayStatus.COMPLETED, message="No events to replay" + ) + + scheduler = AsyncIOScheduler() + self._schedulers[session_id] = scheduler + scheduler.start() + scheduler.add_job( + self._dispatch_next, + trigger="date", + run_date=datetime.now(timezone.utc), + args=[session], + id=f"dispatch_{session_id}", + misfire_grace_time=None, + ) session.status = ReplayStatus.RUNNING session.started_at = datetime.now(timezone.utc) - self._metrics.increment_active_replays() await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) return ReplayOperationResult( @@ -84,9 +104,9 @@ async def pause_session(self, session_id: str) -> ReplayOperationResult: if session.status != ReplayStatus.RUNNING: raise ReplayOperationError(session_id, "pause", "Session is not running") session.status = ReplayStatus.PAUSED - resume_event = self._resume_events.get(session_id) - if resume_event: - resume_event.clear() + scheduler = self._schedulers.get(session_id) + if scheduler: + scheduler.remove_all_jobs() await self._repository.update_session_status(session_id, ReplayStatus.PAUSED) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.PAUSED, message="Replay session paused" @@ -97,9 +117,17 @@ async def resume_session(self, session_id: str) -> ReplayOperationResult: if session.status != ReplayStatus.PAUSED: raise ReplayOperationError(session_id, "resume", "Session is not paused") session.status = ReplayStatus.RUNNING - resume_event = self._resume_events.get(session_id) - if resume_event: - resume_event.set() + scheduler = self._schedulers.get(session_id) + if scheduler: + scheduler.add_job( + self._dispatch_next, + trigger="date", + run_date=datetime.now(timezone.utc), + args=[session], + id=f"dispatch_{session_id}", + replace_existing=True, + misfire_grace_time=None, + ) await self._repository.update_session_status(session_id, ReplayStatus.RUNNING) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.RUNNING, message="Replay session resumed" @@ -108,15 +136,7 @@ async def resume_session(self, session_id: str) -> ReplayOperationResult: async def cancel_session(self, session_id: str) -> ReplayOperationResult: session = self.get_session(session_id) session.status = ReplayStatus.CANCELLED - - resume_event = self._resume_events.get(session_id) - if resume_event: - resume_event.set() - - task = self._active_tasks.get(session_id) - if task and not task.done(): - task.cancel() - + await self._finalize_session(session, ReplayStatus.CANCELLED) await self._repository.update_session_status(session_id, ReplayStatus.CANCELLED) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled" @@ -152,67 +172,110 @@ async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResul self.logger.info("Cleaned up old replay sessions", extra={"removed_count": total_removed}) return CleanupResult(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions") - async def _run_replay(self, session: ReplaySessionState) -> None: + async def _dispatch_next(self, session: ReplaySessionState) -> None: + if session.status != ReplayStatus.RUNNING: + return + + event = self._pop_next_event(session.session_id) + if event is None: + if not await self._load_next_batch(session.session_id): + await self._finalize_session(session, ReplayStatus.COMPLETED) + return + event = self._pop_next_event(session.session_id) + if event is None: + await self._finalize_session(session, ReplayStatus.COMPLETED) + return + + success = False try: - with trace_span( - name="event_replay.session", - kind=SpanKind.INTERNAL, - attributes={ - "replay.session_id": str(session.session_id), - "replay.type": session.config.replay_type, - "replay.target": session.config.target, - }, - ): - total_count = await self._repository.count_events(session.config.filter) - session.total_events = min(total_count, session.config.max_events or total_count) - - async for batch in self._fetch_event_batches(session): - await self._await_if_paused(session) - if session.status != ReplayStatus.RUNNING: - return - await self._process_batch(session, batch) - - session.status = ReplayStatus.COMPLETED - - except asyncio.CancelledError: - session.status = ReplayStatus.CANCELLED + success = await self._replay_event(session, event) except Exception as e: - if session.status == ReplayStatus.CANCELLED: - return - session.status = ReplayStatus.FAILED + session.failed_events += 1 session.errors.append( - ReplayError(timestamp=datetime.now(timezone.utc), error=str(e), error_type=type(e).__name__) - ) - self._metrics.record_replay_error(type(e).__name__) - self.logger.error( - "Replay session failed", - extra={"session_id": session.session_id, "error": str(e)}, - exc_info=True, + ReplayError(timestamp=datetime.now(timezone.utc), event_id=str(event.event_id), error=str(e)) ) - finally: - session.completed_at = datetime.now(timezone.utc) - if session.status == ReplayStatus.COMPLETED and session.started_at: - duration = (session.completed_at - session.started_at).total_seconds() - self._metrics.record_replay_duration(duration, session.config.replay_type) - self._metrics.decrement_active_replays() - await self._update_session_in_db(session) - self._active_tasks.pop(session.session_id, None) - self._resume_events.pop(session.session_id, None) - self.logger.info( - "Replay session finished", - extra={ - "session_id": session.session_id, - "status": session.status.value if hasattr(session.status, "value") else session.status, - "replayed_events": session.replayed_events, - "failed_events": session.failed_events, - }, + if not session.config.skip_errors: + await self._finalize_session(session, ReplayStatus.FAILED) + return + + if success: + session.replayed_events += 1 + else: + session.failed_events += 1 + self._metrics.record_event_replayed( + session.config.replay_type, event.event_type, "success" if success else "failed" + ) + session.last_event_at = event.timestamp + await self._update_session_in_db(session) + + next_event = self._peek_next_event(session.session_id) + delay = 0.0 + if next_event and session.last_event_at and session.config.speed_multiplier < 100: + time_diff = (next_event.timestamp - session.last_event_at).total_seconds() + delay = max(time_diff / session.config.speed_multiplier, 0) + + scheduler = self._schedulers.get(session.session_id) + if scheduler and scheduler.running and session.status == ReplayStatus.RUNNING: + scheduler.add_job( + self._dispatch_next, + trigger="date", + run_date=datetime.now(timezone.utc) + timedelta(seconds=delay), + args=[session], + id=f"dispatch_{session.session_id}", + replace_existing=True, + misfire_grace_time=None, ) - async def _await_if_paused(self, session: ReplaySessionState) -> None: - if session.status == ReplayStatus.PAUSED: - resume_event = self._resume_events.get(session.session_id) - if resume_event: - await resume_event.wait() + def _pop_next_event(self, session_id: str) -> DomainEvent | None: + idx = self._buffer_indices.get(session_id, 0) + buf = self._event_buffers.get(session_id, []) + if idx < len(buf): + self._buffer_indices[session_id] = idx + 1 + return buf[idx] + return None + + def _peek_next_event(self, session_id: str) -> DomainEvent | None: + idx = self._buffer_indices.get(session_id, 0) + buf = self._event_buffers.get(session_id, []) + if idx < len(buf): + return buf[idx] + return None + + async def _load_next_batch(self, session_id: str) -> bool: + batch_iter = self._batch_iters.get(session_id) + if not batch_iter: + return False + try: + batch = await batch_iter.__anext__() + self._event_buffers[session_id] = batch + self._buffer_indices[session_id] = 0 + return True + except StopAsyncIteration: + return False + + async def _finalize_session(self, session: ReplaySessionState, final_status: ReplayStatus) -> None: + session.status = final_status + session.completed_at = datetime.now(timezone.utc) + if final_status == ReplayStatus.COMPLETED and session.started_at: + duration = (session.completed_at - session.started_at).total_seconds() + self._metrics.record_replay_duration(duration, session.config.replay_type) + self._metrics.decrement_active_replays() + await self._update_session_in_db(session) + scheduler = self._schedulers.pop(session.session_id, None) + if scheduler: + scheduler.shutdown(wait=False) + self._batch_iters.pop(session.session_id, None) + self._event_buffers.pop(session.session_id, None) + self._buffer_indices.pop(session.session_id, None) + self.logger.info( + "Replay session finished", + extra={ + "session_id": session.session_id, + "status": session.status.value if hasattr(session.status, "value") else session.status, + "replayed_events": session.replayed_events, + "failed_events": session.failed_events, + }, + ) async def _fetch_event_batches(self, session: ReplaySessionState) -> AsyncIterator[list[DomainEvent]]: events_processed = 0 @@ -245,80 +308,43 @@ async def _fetch_event_batches(self, session: ReplaySessionState) -> AsyncIterat if max_events and events_processed >= max_events: break - async def _process_batch(self, session: ReplaySessionState, batch: list[DomainEvent]) -> None: - with trace_span( - name="event_replay.process_batch", - kind=SpanKind.INTERNAL, - attributes={ - "replay.session_id": str(session.session_id), - "replay.batch.count": len(batch), - "replay.target": session.config.target, - }, - ): - for event in batch: - await self._await_if_paused(session) - if session.status != ReplayStatus.RUNNING: - return - - if session.last_event_at and session.config.speed_multiplier < 100: - time_diff = (event.timestamp - session.last_event_at).total_seconds() - delay = time_diff / session.config.speed_multiplier - if delay > 0: - await asyncio.sleep(delay) - - try: - success = await self._replay_event(session, event) - except Exception as e: - self.logger.error("Failed to replay event", extra={"event_id": event.event_id, "error": str(e)}) - session.failed_events += 1 - session.errors.append( - ReplayError(timestamp=datetime.now(timezone.utc), event_id=str(event.event_id), error=str(e)) - ) - if not session.config.skip_errors: - raise - continue - - if success: - session.replayed_events += 1 - else: - session.failed_events += 1 - self._metrics.record_event_replayed( - session.config.replay_type, event.event_type, "success" if success else "failed" - ) - session.last_event_at = event.timestamp - await self._update_session_in_db(session) - async def _replay_event(self, session: ReplaySessionState, event: DomainEvent) -> bool: config = session.config - attempts = config.retry_attempts if config.retry_failed else 1 - - for attempt in range(attempts): - try: - match config.target: - case ReplayTarget.KAFKA: - if not config.preserve_timestamps: - event.timestamp = datetime.now(timezone.utc) - await self._producer.produce(event_to_produce=event, key=event.aggregate_id or event.event_id) - case ReplayTarget.FILE: - if not config.target_file_path: - self.logger.error("No target file path specified") - return False - await self._write_event_to_file(event, config.target_file_path) - case ReplayTarget.TEST: - pass - case _: - self.logger.error("Unknown replay target", extra={"target": config.target}) - return False - return True - except Exception as e: - self.logger.error( - "Failed to replay event", - extra={"attempt": attempt + 1, "max_attempts": attempts, "error": str(e)}, - ) - if attempt < attempts - 1: - await asyncio.sleep(min(2**attempt, 10)) - - return False + if config.target == ReplayTarget.FILE and not config.target_file_path: + self.logger.error("No target file path specified") + return False + + max_attempts = config.retry_attempts if config.retry_failed else 1 + + @backoff.on_exception( + backoff.expo, + Exception, + max_tries=max_attempts, + max_value=10, + jitter=None, + on_backoff=lambda details: self.logger.error( + "Failed to replay event", + extra={"attempt": details["tries"], "max_attempts": max_attempts, "error": str(details["exception"])}, + ), + ) + async def _dispatch() -> None: + match config.target: + case ReplayTarget.KAFKA: + if not config.preserve_timestamps: + event.timestamp = datetime.now(timezone.utc) + await self._producer.produce(event_to_produce=event, key=event.aggregate_id or event.event_id) + case ReplayTarget.FILE: + await self._write_event_to_file(event, config.target_file_path) # type: ignore[arg-type] + case ReplayTarget.TEST: + pass + case _: + raise ValueError(f"Unknown replay target: {config.target}") + + try: + await _dispatch() + return True + except Exception: + return False async def _write_event_to_file(self, event: DomainEvent, file_path: str) -> None: if file_path not in self._file_locks: diff --git a/backend/app/services/k8s_worker/worker.py b/backend/app/services/k8s_worker/worker.py index 7cf199d2..3d6e4bda 100644 --- a/backend/app/services/k8s_worker/worker.py +++ b/backend/app/services/k8s_worker/worker.py @@ -236,20 +236,6 @@ async def _publish_pod_creation_failed(self, command: CreatePodCommandEvent, err ) await self.producer.produce(event_to_produce=event, key=command.execution_id) - async def wait_for_active_creations(self, timeout: float = 30.0) -> None: - """Wait for active pod creations to complete (for graceful shutdown).""" - if not self._active_creations: - return - - self.logger.info(f"Waiting for {len(self._active_creations)} active pod creations to complete...") - start_time = time.time() - - while self._active_creations and (time.time() - start_time) < timeout: - await asyncio.sleep(1) - - if self._active_creations: - self.logger.warning(f"Timeout waiting for pod creations, {len(self._active_creations)} still active") - async def ensure_image_pre_puller_daemonset(self) -> None: """Ensure the runtime image pre-puller DaemonSet exists.""" daemonset_name = "runtime-image-pre-puller" diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index e0aa7e6a..82dd5f5f 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -4,6 +4,7 @@ from datetime import UTC, datetime, timedelta from typing import Awaitable, Callable +import backoff import httpx from app.core.metrics import NotificationMetrics @@ -658,57 +659,58 @@ async def _deliver_notification(self, notification: DomainNotification) -> bool: ) return False - last_error: Exception | None = None start_time = asyncio.get_running_loop().time() - for attempt in range(notification.max_retries): - try: - await handler(notification, subscription) - - delivery_time = asyncio.get_running_loop().time() - start_time - await self.repository.update_notification( - notification.notification_id, - notification.user_id, - DomainNotificationUpdate(status=NotificationStatus.DELIVERED, delivered_at=datetime.now(UTC)), - ) - self.logger.info( - f"Delivered notification {notification.notification_id}", - extra={ - "notification_id": str(notification.notification_id), - "channel": notification.channel, - "delivery_time_ms": int(delivery_time * 1000), - "attempt": attempt + 1, - }, - ) - self.metrics.record_notification_sent( - notification.severity, channel=notification.channel, severity=notification.severity - ) - self.metrics.record_notification_delivery_time( - delivery_time, notification.severity, channel=notification.channel - ) - return True - - except Exception as e: - last_error = e - self.logger.warning( - f"Delivery attempt {attempt + 1}/{notification.max_retries} failed " - f"for {notification.notification_id}: {e}", - ) - if attempt + 1 < notification.max_retries: - await asyncio.sleep(min(2 ** attempt, 30)) - - await self.repository.update_notification( - notification.notification_id, - notification.user_id, - DomainNotificationUpdate( - status=NotificationStatus.FAILED, - failed_at=datetime.now(UTC), - error_message=f"Delivery failed via {notification.channel}: {last_error}", - retry_count=notification.max_retries, + @backoff.on_exception( + backoff.expo, + Exception, + max_tries=notification.max_retries, + max_value=30, + jitter=None, + on_backoff=lambda details: self.logger.warning( + f"Delivery attempt {details['tries']}/{notification.max_retries} failed " + f"for {notification.notification_id}: {details['exception']}", ), ) - self.logger.error( - f"All delivery attempts exhausted for {notification.notification_id}: {last_error}", - exc_info=last_error, - ) - return False + async def _attempt() -> None: + await handler(notification, subscription) + + try: + await _attempt() + delivery_time = asyncio.get_running_loop().time() - start_time + await self.repository.update_notification( + notification.notification_id, + notification.user_id, + DomainNotificationUpdate(status=NotificationStatus.DELIVERED, delivered_at=datetime.now(UTC)), + ) + self.logger.info( + f"Delivered notification {notification.notification_id}", + extra={ + "notification_id": str(notification.notification_id), + "channel": notification.channel, + "delivery_time_ms": int(delivery_time * 1000), + }, + ) + self.metrics.record_notification_sent( + notification.severity, channel=notification.channel, severity=notification.severity + ) + self.metrics.record_notification_delivery_time( + delivery_time, notification.severity, channel=notification.channel + ) + return True + except Exception as last_error: + await self.repository.update_notification( + notification.notification_id, + notification.user_id, + DomainNotificationUpdate( + status=NotificationStatus.FAILED, + failed_at=datetime.now(UTC), + error_message=f"Delivery failed via {notification.channel}: {last_error}", + retry_count=notification.max_retries, + ), + ) + self.logger.error( + f"All delivery attempts exhausted for {notification.notification_id}: {last_error}", + exc_info=last_error, + ) + return False diff --git a/backend/tests/unit/core/test_adaptive_sampling.py b/backend/tests/unit/core/test_adaptive_sampling.py index c3c8d65d..9dde2fab 100644 --- a/backend/tests/unit/core/test_adaptive_sampling.py +++ b/backend/tests/unit/core/test_adaptive_sampling.py @@ -1,5 +1,5 @@ import time -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest from app.core.adaptive_sampling import AdaptiveSampler, create_adaptive_sampler @@ -7,71 +7,56 @@ def test_is_error_variants() -> None: - # Mock the thread start to avoid background thread delays in tests - with patch.object(AdaptiveSampler, '_adjustment_loop', return_value=None): - s = AdaptiveSampler(base_rate=0.5, adjustment_interval=1) - # Plain error flag - assert s._is_error({"error": True}) is True - # HTTP status >= 500 - assert s._is_error({"http.status_code": 500}) is True - assert s._is_error({"http.status_code": "503"}) is True - # Exception present - assert s._is_error({"exception.type": "ValueError"}) is True - # Not error - assert s._is_error({"http.status_code": 200}) is False - s._running = False # Ensure cleanup + s = AdaptiveSampler(base_rate=0.5, adjustment_interval=1) + assert s._is_error({"error": True}) is True + assert s._is_error({"http.status_code": 500}) is True + assert s._is_error({"http.status_code": "503"}) is True + assert s._is_error({"exception.type": "ValueError"}) is True + assert s._is_error({"http.status_code": 200}) is False def test_should_sample_respects_rate() -> None: - with patch('threading.Thread'): # Mock thread to avoid background delays - s = AdaptiveSampler(base_rate=1.0, adjustment_interval=1) - # With current_rate=1.0, all trace_ids sample - res = s.should_sample(None, trace_id=123, name="op") - assert res.decision.value == 2 # RECORD_AND_SAMPLE - # With rate ~0, most should drop; we choose large id to exceed threshold - s._current_rate = 0.0 - res2 = s.should_sample(None, trace_id=(1 << 64) - 1, name="op") - assert res2.decision.value in (0, 1) # DROP or RECORD_ONLY depending impl - s._running = False + s = AdaptiveSampler(base_rate=1.0, adjustment_interval=1) + # With current_rate=1.0, all trace_ids sample + res = s.should_sample(None, trace_id=123, name="op") + assert res.decision.value == 2 # RECORD_AND_SAMPLE + # With rate ~0, most should drop; we choose large id to exceed threshold + s._current_rate = 0.0 + res2 = s.should_sample(None, trace_id=(1 << 64) - 1, name="op") + assert res2.decision.value in (0, 1) # DROP or RECORD_ONLY depending impl def test_adjust_sampling_rate_error_and_traffic() -> None: - with patch('threading.Thread'): # Mock thread to avoid background delays - s = AdaptiveSampler(base_rate=0.1, adjustment_interval=1) - now = time.time() - # Simulate 100 requests in window with 10 errors (> threshold 5%) - s._request_window.clear() - s._error_window.clear() - for _ in range(100): - s._request_window.append(now) - for _ in range(10): - s._error_window.append(now) - old = s._current_rate - s._adjust_sampling_rate() - assert s._current_rate >= old - # Simulate high traffic and low errors -> decrease toward min_rate - s._request_window.clear() - s._error_window.clear() - for _ in range(2000): - s._request_window.append(now) - old2 = s._current_rate - s._adjust_sampling_rate() - assert s._current_rate <= old2 - s._running = False + s = AdaptiveSampler(base_rate=0.1, adjustment_interval=1) + now = time.time() + # Simulate 100 requests in window with 10 errors (> threshold 5%) + s._request_window.clear() + s._error_window.clear() + for _ in range(100): + s._request_window.append(now) + for _ in range(10): + s._error_window.append(now) + old = s._current_rate + s._adjust_sampling_rate() + assert s._current_rate >= old + # Simulate high traffic and low errors -> decrease toward min_rate + s._request_window.clear() + s._error_window.clear() + for _ in range(2000): + s._request_window.append(now) + old2 = s._current_rate + s._adjust_sampling_rate() + assert s._current_rate <= old2 def test_get_description_and_factory(monkeypatch: pytest.MonkeyPatch) -> None: - with patch('threading.Thread'): # Mock thread to avoid background delays - s = AdaptiveSampler(base_rate=0.2, adjustment_interval=1) - desc = s.get_description() - assert "AdaptiveSampler(" in desc - s._running = False + s = AdaptiveSampler(base_rate=0.2, adjustment_interval=1) + desc = s.get_description() + assert "AdaptiveSampler(" in desc - mock_settings = MagicMock(spec=Settings) - mock_settings.TRACING_SAMPLING_RATE = 0.2 - - monkeypatch.setenv("TRACING_SAMPLING_RATE", "0.2") - # create_adaptive_sampler pulls settings via get_settings; just ensure it constructs - sampler = create_adaptive_sampler(mock_settings) - sampler._running = False + mock_settings = MagicMock(spec=Settings) + mock_settings.TRACING_SAMPLING_RATE = 0.2 + monkeypatch.setenv("TRACING_SAMPLING_RATE", "0.2") + sampler = create_adaptive_sampler(mock_settings) + assert sampler._current_rate == 0.2 From b7d0624f659f8b3de19e3fa5e5ac2046f68205f9 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Sat, 7 Feb 2026 13:42:25 +0100 Subject: [PATCH 2/3] fix: failed events calculation fixed in replay service --- backend/app/services/event_replay/replay_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index b73d1994..84468380 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -190,11 +190,11 @@ async def _dispatch_next(self, session: ReplaySessionState) -> None: try: success = await self._replay_event(session, event) except Exception as e: - session.failed_events += 1 session.errors.append( ReplayError(timestamp=datetime.now(timezone.utc), event_id=str(event.event_id), error=str(e)) ) if not session.config.skip_errors: + session.failed_events += 1 await self._finalize_session(session, ReplayStatus.FAILED) return From eb869e6e1dfccb5e6f1468837aa1fb71b8e6ea91 Mon Sep 17 00:00:00 2001 From: HardMax71 Date: Sat, 7 Feb 2026 13:46:01 +0100 Subject: [PATCH 3/3] fix: misc --- backend/app/core/adaptive_sampling.py | 6 +++--- backend/app/db/docs/notification.py | 2 +- backend/app/domain/notification/models.py | 2 +- backend/app/schemas_pydantic/notification.py | 2 +- backend/app/services/notification_service.py | 5 +++-- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/backend/app/core/adaptive_sampling.py b/backend/app/core/adaptive_sampling.py index a3785c2b..ccbfe0e3 100644 --- a/backend/app/core/adaptive_sampling.py +++ b/backend/app/core/adaptive_sampling.py @@ -52,9 +52,9 @@ def __init__( self._current_rate = base_rate self._last_adjustment = time.time() - # Sliding window for rate calculation (1 minute window) - self._request_window: deque[float] = deque(maxlen=60) - self._error_window: deque[float] = deque(maxlen=60) + # Sliding window for rate calculation (1 minute window, pruned by _calculate_metrics) + self._request_window: deque[float] = deque() + self._error_window: deque[float] = deque() logging.getLogger("integr8scode").info(f"Adaptive sampler initialized with base rate: {base_rate}") diff --git a/backend/app/db/docs/notification.py b/backend/app/db/docs/notification.py index e44ef4ed..5bc714af 100644 --- a/backend/app/db/docs/notification.py +++ b/backend/app/db/docs/notification.py @@ -42,7 +42,7 @@ class NotificationDocument(Document): # Error handling retry_count: int = 0 - max_retries: int = 3 + max_retries: int = Field(3, ge=1) error_message: str | None = None # Context diff --git a/backend/app/domain/notification/models.py b/backend/app/domain/notification/models.py index 0c849281..45123ab9 100644 --- a/backend/app/domain/notification/models.py +++ b/backend/app/domain/notification/models.py @@ -36,7 +36,7 @@ class DomainNotification(BaseModel): failed_at: datetime | None = None retry_count: int = 0 - max_retries: int = 3 + max_retries: int = Field(3, ge=1) error_message: str | None = None metadata: dict[str, Any] = Field(default_factory=dict) diff --git a/backend/app/schemas_pydantic/notification.py b/backend/app/schemas_pydantic/notification.py index 1cea301f..222b1721 100644 --- a/backend/app/schemas_pydantic/notification.py +++ b/backend/app/schemas_pydantic/notification.py @@ -39,7 +39,7 @@ class Notification(BaseModel): # Error handling retry_count: int = 0 - max_retries: int = 3 + max_retries: int = Field(3, ge=1) error_message: str | None = None # Context diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index 82dd5f5f..5fd6f97d 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -691,11 +691,12 @@ async def _attempt() -> None: "delivery_time_ms": int(delivery_time * 1000), }, ) + notification_type = notification.tags[0] if notification.tags else "unknown" self.metrics.record_notification_sent( - notification.severity, channel=notification.channel, severity=notification.severity + notification_type, channel=notification.channel, severity=notification.severity ) self.metrics.record_notification_delivery_time( - delivery_time, notification.severity, channel=notification.channel + delivery_time, notification_type, channel=notification.channel ) return True except Exception as last_error: