Conversation
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughThis PR removes per-iteration rate limiting from event replay, replaces background threading with lazy, inline rate adjustments in adaptive sampling, refactors event replay service to use scheduler-based batching and dispatch, removes a Kubernetes worker graceful shutdown method, and adopts backoff-based retry mechanisms for notification delivery. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as API Client
participant Service as ReplayService
participant Scheduler as AsyncIOScheduler
participant Handler as EventHandler
participant DB as Database
Client->>Service: start_replay(session_id, events)
Service->>Service: _batch_iters[session_id] = batches
Service->>DB: record session (RUNNING)
loop Dispatch Loop (_dispatch_next)
Scheduler->>Service: trigger job
Service->>Service: _pop_next_event(session_id)
alt Event Available
Service->>Service: _load_next_batch (if buffer low)
Service->>Handler: _replay_event(event)
Handler->>Handler: backoff.on_exception (with retry)
Handler->>DB: write/send event
Service->>Service: _update_session_in_db (metrics)
else No More Events
Service->>Service: _finalize_session(session_id)
Service->>DB: record session (COMPLETED/FAILED)
Scheduler->>Service: job complete
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
1 issue found across 6 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/event_replay/replay_service.py">
<violation number="1" location="backend/app/services/event_replay/replay_service.py:197">
P1: Double-counting of `failed_events` when exception is caught and `skip_errors` is True. The except block increments `failed_events` but doesn't return, causing the else branch to increment it again since `success` remains `False`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@backend/app/core/adaptive_sampling.py`:
- Around line 55-57: The sliding-window deques _request_window and _error_window
were created with maxlen=60 which caps request_rate to 60 and makes the
high-traffic branch in _adjust_sampling_rate effectively dead; remove the maxlen
parameter (or set it to a value >= high_traffic_threshold) so the time-based
pruning in _calculate_metrics can correctly compute request_rate over the
1-minute window and enable high-traffic detection.
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 189-204: The bug double-increments session.failed_events when
_replay_event raises and session.config.skip_errors is True; fix by
restructuring the try/except so accounting happens only once: in replay loop
catch exceptions from _replay_event (append ReplayError to session.errors and
increment session.failed_events), and if not session.config.skip_errors call
await self._finalize_session(session, ReplayStatus.FAILED) and return; otherwise
skip the normal success/failed accounting (e.g. use try/except/else or continue)
so that the later block that inspects the success variable (and increments
session.failed_events or session.replayed_events) only runs for successful,
non-exceptional attempts. Ensure references: _replay_event,
session.failed_events, session.errors, session.config.skip_errors,
_finalize_session, ReplayStatus.FAILED.
In `@backend/app/services/notification_service.py`:
- Around line 694-699: The metrics calls in notification_service.py are passing
notification.severity where notification_type is expected; update the calls to
derive notification_type = notification.tags[0] if present else "unknown" and
pass that as the first argument to self.metrics.record_notification_sent(...)
and self.metrics.record_notification_delivery_time(...), keeping
channel=notification.channel and severity=notification.severity for
record_notification_sent and using delivery_time as the duration argument for
record_notification_delivery_time.
- Around line 664-668: The notification.max_retries field must be constrained to
be >=1 to avoid backoff.on_exception getting max_tries=0; update the Pydantic
model that defines max_retries (e.g., the Notification or NotificationConfig
class and its max_retries attribute) to use a constrained field such as Field(3,
ge=1) or conint(ge=1) so validation rejects 0, and ensure any default remains 3;
this prevents passing 0 into backoff.on_exception (referenced where
backoff.on_exception(..., max_tries=notification.max_retries) is used).
In `@backend/tests/unit/core/test_adaptive_sampling.py`:
- Around line 29-49: The high-traffic part of
test_adjust_sampling_rate_error_and_traffic never triggers because
AdaptiveSampler._request_window has maxlen=60 so appending 2000 entries still
yields at most 60 requests and won't exceed the default high_traffic_threshold;
fix the test by constructing the sampler with a lower threshold (e.g., use
AdaptiveSampler(base_rate=0.1, adjustment_interval=1,
high_traffic_threshold=50)) or otherwise set s._high_traffic_threshold to a
value <=60 before appending, then re-run the same append/adjust/assert sequence
to actually exercise the high-traffic branch of _adjust_sampling_rate.
🧹 Nitpick comments (6)
backend/app/services/notification_service.py (1)
664-676:jitter=Nonedisables jitter on exponential backoff for external service calls.For webhook and Slack deliveries, correlated retries without jitter can cause synchronized retry storms against the same external endpoint. Consider using
backoff.full_jitter(the default) instead ofNone.Proposed fix
`@backoff.on_exception`( backoff.expo, Exception, max_tries=notification.max_retries, max_value=30, - jitter=None, + jitter=backoff.full_jitter, on_backoff=lambda details: self.logger.warning(backend/app/core/adaptive_sampling.py (1)
99-103: Thread-safety consideration after removing locks.
should_sample()is invoked by the OpenTelemetry SDK, potentially from multiple threads. The removed threading lock previously protected shared state. While CPython's GIL makes individualdeque.append/popleftoperations atomic, compound operations in_calculate_metrics(iterating + popping) interleaved withappendinshould_samplecould yield approximate results.Given that sampling is inherently probabilistic, this is likely acceptable — but worth documenting the design choice.
backend/tests/unit/core/test_adaptive_sampling.py (1)
60-62:monkeypatch.setenvon Line 60 has no effect here.
create_adaptive_samplerreads from themock_settingsobject (whoseTRACING_SAMPLING_RATEis set to0.2on Line 58), not from environment variables. Thesetenvcall is dead code.Proposed fix
- monkeypatch.setenv("TRACING_SAMPLING_RATE", "0.2") sampler = create_adaptive_sampler(mock_settings)backend/app/services/event_replay/replay_service.py (3)
82-92: OneAsyncIOSchedulerper session is heavyweight.Each replay session creates its own APScheduler
AsyncIOSchedulerinstance. Under load with many concurrent sessions, this could create significant overhead (each scheduler runs its own internal event loop machinery). Consider using a single shared scheduler with per-session job IDs, or a simpler approach likeasyncio.call_later/loop.call_later.
248-249: Preferanext()builtin over__anext__().
anext(batch_iter)is the idiomatic Python 3.10+ way to advance an async iterator.Proposed fix
- batch = await batch_iter.__anext__() + batch = await anext(batch_iter)
319-324: Same note as notification_service.py:jitter=Nonedisables jitter on retry backoff.For Kafka produce retries, correlated retry storms are less of a concern than with external webhooks, but adding jitter is still a best practice.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/event_replay/replay_service.py (1)
136-143:⚠️ Potential issue | 🟠 MajorActive-replay counter can go negative; redundant DB update.
cancel_sessionhas no status guard, so cancelling aCREATEDor already-completed session calls_finalize_session→decrement_active_replays()(line 262) without a priorincrement_active_replays(). This drifts the gauge negative.Additionally,
_finalize_sessionalready persists the session via_update_session_in_db(line 263), then line 140 callsupdate_session_statusagain — a redundant DB write.Proposed fix
async def cancel_session(self, session_id: str) -> ReplayOperationResult: session = self.get_session(session_id) + if session.status in {ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED}: + raise ReplayOperationError(session_id, "cancel", "Session is already finished") session.status = ReplayStatus.CANCELLED await self._finalize_session(session, ReplayStatus.CANCELLED) - await self._repository.update_session_status(session_id, ReplayStatus.CANCELLED) return ReplayOperationResult( session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled" )And in
_finalize_session, guard the decrement:- self._metrics.decrement_active_replays() + if final_status != ReplayStatus.CANCELLED or session.started_at is not None: + self._metrics.decrement_active_replays()Or more robustly, only decrement if the session was previously
RUNNING(track an_active_session_idsset).
🤖 Fix all issues with AI agents
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 82-97: The dispatch job can race because the scheduler is started
and the job added while session.status is still CREATED; move the status
transition so that session.status = ReplayStatus.RUNNING, session.started_at =
datetime.now(timezone.utc), and await
self._repository.update_session_status(session_id, ReplayStatus.RUNNING) happen
before creating/starting the AsyncIOScheduler and calling
scheduler.add_job(self._dispatch_next, ...); keep self._schedulers[session_id]
assignment and self._metrics.increment_active_replays() adjacent to the status
update (either before or after the scheduler start) so _dispatch_next will
always observe the RUNNING state when it first runs.
- Around line 319-347: The backoff decorator on the _dispatch function currently
retries on all Exception types (backoff.on_exception(..., Exception,...)),
causing retries for non-transient errors like the ValueError raised for unknown
ReplayTarget; change the exception tuple to only include transient errors from
the Kafka producer and file I/O (e.g., the specific producer exception class(s)
you use and OSError/IOError) so ValueError and other non-retryable exceptions
(raised in the ReplayTarget._dispatch default case) are not retried, and change
the on_backoff logger call from self.logger.error to self.logger.warning to
lower the log severity for expected retry attempts. Ensure references to
self._producer.produce and _write_event_to_file remain inside _dispatch and that
the ValueError path still raises immediately.
🧹 Nitpick comments (4)
backend/app/services/event_replay/replay_service.py (4)
38-42: Consider consolidating per-session state into a single dataclass.Five parallel dictionaries (
_sessions,_schedulers,_batch_iters,_event_buffers,_buffer_indices) keyed bysession_idare easy to get out of sync. A singledict[str, SessionRuntime](whereSessionRuntimebundles the scheduler, iterator, buffer, and index) would make cleanup less error-prone and easier to reason about.
248-249: Preferanext()builtin over.__anext__().Python 3.10+ provides
anext()as the async counterpart tonext(). It's more idiomatic and reads more clearly.Proposed fix
- batch = await batch_iter.__anext__() + batch = await anext(batch_iter)
358-369: DB update failures are silently swallowed — in-memory state will diverge.
_update_session_in_dblogs the error but doesn't propagate it. If the DB is intermittently unreachable, the in-memory session state (event counts, status) will drift from the persisted state with no indication to the caller. Consider at minimum recording a metric for DB update failures so they're observable.
82-84: OneAsyncIOSchedulerper session is heavyweight at scale.Each
AsyncIOSchedulerinstance spins up its own thread pool. With many concurrent replay sessions, this linearly scales thread count. A single shared scheduler (class-level) with per-session job IDs (already namespaced asdispatch_{session_id}) would be more resource-efficient and easier to manage.
|



Summary by cubic
Removed ad-hoc asyncio.sleep usage across the codebase and replaced it with scheduler- and backoff-based flows to improve reliability, throughput, and non-blocking behavior. Event replay is now time-accurate and pause/resume is immediate without busy waiting; sampling and notifications are cleaner and more predictable.
Refactors
Dependencies
Written for commit eb869e6. Summary will update on new commits.
Summary by CodeRabbit
New Features
Performance Improvements
Bug Fixes