Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any
Expand Down Expand Up @@ -340,11 +339,7 @@ async def replay_aggregate_events(
replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}"
replayed_count = 0

for i, event in enumerate(replay_info.events):
# Rate limiting: pause every 100 events to prevent overwhelming the system
if i > 0 and i % 100 == 0:
await asyncio.sleep(0.1)

for event in replay_info.events:
try:
meta = EventMetadata(
service_name=settings.SERVICE_NAME,
Expand Down
105 changes: 28 additions & 77 deletions backend/app/core/adaptive_sampling.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import threading
import time
from collections import deque
from collections.abc import Sequence
Expand All @@ -18,6 +17,9 @@ class AdaptiveSampler(Sampler):
- Error rate
- Request rate
- Resource utilization

Rate adjustment is lazy: it runs inline during should_sample()
when the adjustment interval has elapsed.
"""

def __init__(
Expand Down Expand Up @@ -47,25 +49,12 @@ def __init__(
self.high_traffic_threshold = high_traffic_threshold
self.adjustment_interval = adjustment_interval

# Current sampling rate
self._current_rate = base_rate

# Metrics tracking
self._request_count = 0
self._error_count = 0
self._last_adjustment = time.time()

# Sliding window for rate calculation
self._request_window: deque[float] = deque(maxlen=60) # 1 minute window
self._error_window: deque[float] = deque(maxlen=60) # 1 minute window

# Thread safety
self._lock = threading.Lock()

# Start background adjustment thread
self._running = True
self._adjustment_thread = threading.Thread(target=self._adjustment_loop, daemon=True)
self._adjustment_thread.start()
# Sliding window for rate calculation (1 minute window, pruned by _calculate_metrics)
self._request_window: deque[float] = deque()
self._error_window: deque[float] = deque()

logging.getLogger("integr8scode").info(f"Adaptive sampler initialized with base rate: {base_rate}")

Expand All @@ -80,7 +69,6 @@ def should_sample(
trace_state: TraceState | None = None,
) -> SamplingResult:
"""Determine if a span should be sampled"""
# Get parent trace state
parent_span_context = get_current_span(parent_context).get_span_context()
parent_trace_state = None

Expand All @@ -96,23 +84,27 @@ def should_sample(
return SamplingResult(decision=Decision.RECORD_AND_SAMPLE, attributes=attributes)

# Track request
self._track_request()
self._request_window.append(time.time())

# Always sample errors
if self._is_error(attributes):
self._track_error()
self._error_window.append(time.time())
if parent_trace_state is not None:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE, attributes=attributes, trace_state=parent_trace_state
)
else:
return SamplingResult(decision=Decision.RECORD_AND_SAMPLE, attributes=attributes)

# Apply current sampling rate using integer arithmetic to avoid precision issues
# Use trace ID for deterministic sampling
max_trace_id = (1 << 64) - 1 # 0xffffffffffffffff
# Lazy adjustment: re-evaluate rate when interval has elapsed
now = time.time()
if now - self._last_adjustment >= self.adjustment_interval:
self._last_adjustment = now
self._adjust_sampling_rate()

# Apply current sampling rate using trace ID for deterministic sampling
max_trace_id = (1 << 64) - 1
masked_trace_id = trace_id & max_trace_id
# Compute threshold as integer, capping at max_trace_id if rate is 1.0
threshold = int(self._current_rate * max_trace_id)
if self._current_rate >= 1.0:
threshold = max_trace_id
Expand All @@ -134,28 +126,14 @@ def get_description(self) -> str:
"""Return sampler description"""
return f"AdaptiveSampler(current_rate={self._current_rate:.2%})"

def _track_request(self) -> None:
"""Track a request"""
with self._lock:
self._request_count += 1
self._request_window.append(time.time())

def _track_error(self) -> None:
"""Track an error"""
with self._lock:
self._error_count += 1
self._error_window.append(time.time())

def _is_error(self, attributes: Attributes | None) -> bool:
"""Check if span attributes indicate an error"""
if not attributes:
return False

# Check for error status
if attributes.get("error", False):
return True

# Check HTTP status code
status_code = attributes.get("http.status_code")
if status_code and isinstance(status_code, (int, float)):
if int(status_code) >= 500:
Expand All @@ -164,26 +142,22 @@ def _is_error(self, attributes: Attributes | None) -> bool:
if int(status_code) >= 500:
return True

# Check for exception
if attributes.get("exception.type"):
return True

return False

def _calculate_metrics(self) -> tuple[float, int]:
"""Calculate current error rate and request rate"""
now = time.time()
minute_ago = now - 60
minute_ago = time.time() - 60

with self._lock:
# Clean old entries
while self._request_window and self._request_window[0] < minute_ago:
self._request_window.popleft()
while self._error_window and self._error_window[0] < minute_ago:
self._error_window.popleft()
while self._request_window and self._request_window[0] < minute_ago:
self._request_window.popleft()
while self._error_window and self._error_window[0] < minute_ago:
self._error_window.popleft()

request_rate = len(self._request_window)
error_rate = len(self._error_window) / max(1, len(self._request_window))
request_rate = len(self._request_window)
error_rate = len(self._error_window) / max(1, len(self._request_window))

return error_rate, request_rate

Expand All @@ -193,59 +167,36 @@ def _adjust_sampling_rate(self) -> None:

new_rate = self.base_rate

# Increase sampling during high error rates
if error_rate > self.error_rate_threshold:
# Scale up based on error rate
error_multiplier: float = min(10.0, 1 + (error_rate / self.error_rate_threshold))
new_rate = min(self.max_rate, self.base_rate * error_multiplier)
logging.getLogger("integr8scode").warning(
f"High error rate detected ({error_rate:.1%}), increasing sampling to {new_rate:.1%}"
)

# Decrease sampling during high traffic
elif request_rate > self.high_traffic_threshold:
# Scale down based on traffic
traffic_divisor = request_rate / self.high_traffic_threshold
new_rate = max(self.min_rate, self.base_rate / traffic_divisor)
logging.getLogger("integr8scode").info(
f"High traffic detected ({request_rate} req/min), decreasing sampling to {new_rate:.1%}"
)

# Apply gradual changes
if new_rate != self._current_rate:
# Smooth transitions
change_rate = 0.5 # Adjust 50% towards target
change_rate = 0.5
self._current_rate = self._current_rate + (new_rate - self._current_rate) * change_rate

logging.getLogger("integr8scode").info(
f"Adjusted sampling rate to {self._current_rate:.1%} "
f"(error_rate: {error_rate:.1%}, request_rate: {request_rate} req/min)"
)

def _adjustment_loop(self) -> None:
"""Background thread for periodic rate adjustment"""
while self._running:
time.sleep(self.adjustment_interval)

try:
self._adjust_sampling_rate()
except Exception as e:
logging.getLogger("integr8scode").error(f"Error adjusting sampling rate: {e}")

def shutdown(self) -> None:
"""Shutdown the sampler"""
self._running = False
if self._adjustment_thread.is_alive():
self._adjustment_thread.join(timeout=5.0)


def create_adaptive_sampler(settings: Settings) -> AdaptiveSampler:
"""Create adaptive sampler with settings"""
return AdaptiveSampler(
base_rate=settings.TRACING_SAMPLING_RATE,
min_rate=max(0.001, settings.TRACING_SAMPLING_RATE / 100), # 1/100th of base
min_rate=max(0.001, settings.TRACING_SAMPLING_RATE / 100),
max_rate=1.0,
error_rate_threshold=0.05, # 5% error rate
high_traffic_threshold=1000, # 1000 requests per minute
adjustment_interval=60, # Adjust every minute
error_rate_threshold=0.05,
high_traffic_threshold=1000,
adjustment_interval=60,
)
2 changes: 1 addition & 1 deletion backend/app/db/docs/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class NotificationDocument(Document):

# Error handling
retry_count: int = 0
max_retries: int = 3
max_retries: int = Field(3, ge=1)
error_message: str | None = None

# Context
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/notification/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DomainNotification(BaseModel):
failed_at: datetime | None = None

retry_count: int = 0
max_retries: int = 3
max_retries: int = Field(3, ge=1)
error_message: str | None = None

metadata: dict[str, Any] = Field(default_factory=dict)
Expand Down
2 changes: 1 addition & 1 deletion backend/app/schemas_pydantic/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Notification(BaseModel):

# Error handling
retry_count: int = 0
max_retries: int = 3
max_retries: int = Field(3, ge=1)
error_message: str | None = None

# Context
Expand Down
Loading
Loading