Skip to content

fix: removed asyncio.sleep#144

Merged
HardMax71 merged 3 commits intomainfrom
fix/asyncio-sleep
Feb 7, 2026
Merged

fix: removed asyncio.sleep#144
HardMax71 merged 3 commits intomainfrom
fix/asyncio-sleep

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 7, 2026


Summary by cubic

Removed ad-hoc asyncio.sleep usage across the codebase and replaced it with scheduler- and backoff-based flows to improve reliability, throughput, and non-blocking behavior. Event replay is now time-accurate and pause/resume is immediate without busy waiting; sampling and notifications are cleaner and more predictable.

  • Refactors

    • Event replay: use AsyncIOScheduler to dispatch events by timestamp with speed_multiplier; added batch iterators and in-memory buffers; pause/resume via scheduler jobs; centralized finalize/cleanup; retries via backoff.expo; fixed failed events accounting.
    • Notifications: replaced manual retry loops and sleeps with backoff.expo; validated max_retries ≥ 1; records delivery metrics on success/failure.
    • Adaptive sampler: removed background thread and locks; lazy, inline rate adjustment on interval; simplified sliding windows; updated tests.
    • API replay endpoint: removed per-100 event sleep throttle.
    • K8s worker: removed sleep-based wait_for_active_creations.
  • Dependencies

    • Added backoff and APScheduler.

Written for commit eb869e6. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Scheduler-driven event replay with buffered batch processing.
    • Notification delivery now uses an exponential-backoff retry policy.
  • Performance Improvements

    • Removed artificial per-iteration delays during replay for faster processing.
    • Adaptive sampling now updates rates lazily, reducing background work.
  • Bug Fixes

    • More robust replay finalization and error tracking.
    • Changed worker shutdown behavior related to active creation waits.

@coderabbitai
Copy link

coderabbitai bot commented Feb 7, 2026

Warning

Rate limit exceeded

@HardMax71 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 19 minutes and 1 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📝 Walkthrough

Walkthrough

This PR removes per-iteration rate limiting from event replay, replaces background threading with lazy, inline rate adjustments in adaptive sampling, refactors event replay service to use scheduler-based batching and dispatch, removes a Kubernetes worker graceful shutdown method, and adopts backoff-based retry mechanisms for notification delivery.

Changes

Cohort / File(s) Summary
Event Replay Loop
backend/app/api/routes/events.py
Removed per-iteration rate limiting (asyncio.sleep every 100 events) from replay_aggregate_events loop; simplified iteration and removed unused asyncio import.
Adaptive Sampling
backend/app/core/adaptive_sampling.py, backend/tests/unit/core/test_adaptive_sampling.py
Replaced background threading and explicit counters with lazy, inline rate adjustments in should_sample() triggered by elapsed time; introduced sliding windows (1-minute) using deques for request/error tracking; updated tests to remove thread mocking and directly instantiate AdaptiveSampler.
Event Replay Service
backend/app/services/event_replay/replay_service.py
Refactored session-based event replay from direct task tracking to scheduler-based dispatch with batch loading, buffering, and dynamic event popping; added _dispatch_next() loop, buffer management (_batch_iters, _event_buffers), and backoff-based retry for event dispatch; improved error handling via ReplayError and centralized finalization; added file-based target handling with locking.
Kubernetes Worker
backend/app/services/k8s_worker/worker.py
Removed wait_for_active_creations() async helper method that provided graceful shutdown wait for active pod creations.
Notification Service
backend/app/services/notification_service.py
Replaced inline manual retry loop with backoff-based retry decorator (@backoff.on_exception) in _deliver_notification; consolidated success/failure logging and removed per-attempt state tracking.

Sequence Diagram(s)

sequenceDiagram
    participant Client as API Client
    participant Service as ReplayService
    participant Scheduler as AsyncIOScheduler
    participant Handler as EventHandler
    participant DB as Database

    Client->>Service: start_replay(session_id, events)
    Service->>Service: _batch_iters[session_id] = batches
    Service->>DB: record session (RUNNING)
    
    loop Dispatch Loop (_dispatch_next)
        Scheduler->>Service: trigger job
        Service->>Service: _pop_next_event(session_id)
        alt Event Available
            Service->>Service: _load_next_batch (if buffer low)
            Service->>Handler: _replay_event(event)
            Handler->>Handler: backoff.on_exception (with retry)
            Handler->>DB: write/send event
            Service->>Service: _update_session_in_db (metrics)
        else No More Events
            Service->>Service: _finalize_session(session_id)
            Service->>DB: record session (COMPLETED/FAILED)
            Scheduler->>Service: job complete
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Possibly related PRs

  • fix: event replay service #119 — Overlapping changes to backend/app/services/event_replay/replay_service.py with modifications to event replay loop, session lifecycle, and batch scheduling logic.
  • feat: async k8s lib #114 — Related changes to backend/app/services/k8s_worker/worker.py and KubernetesWorker lifecycle, including modifications to graceful shutdown behavior.
  • chore: type fixes #74 — Concurrent modifications to backend/app/api/routes/events.py in the replay_aggregate_events function affecting event payload handling.

Poem

🐰 Threads we've shed, now lazy and keen,
Batches dispatched by scheduler's routine,
Retries back off with graceful delay,
Five service flows now streamlined their way! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 27.59% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'fix: removed asyncio.sleep' is only partially related to the changeset. While asyncio.sleep removal is one change in events.py, the PR involves substantial architectural changes across multiple files including rate limiting refactoring, replay service redesign, notification retry mechanism updates, and Kubernetes worker modifications. Consider using a more comprehensive title like 'refactor: optimize rate limiting and async retry mechanisms' or 'refactor: remove background threading and consolidate async patterns' that better captures the scope of changes across multiple services.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/asyncio-sleep

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 6 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/event_replay/replay_service.py">

<violation number="1" location="backend/app/services/event_replay/replay_service.py:197">
P1: Double-counting of `failed_events` when exception is caught and `skip_errors` is True. The except block increments `failed_events` but doesn't return, causing the else branch to increment it again since `success` remains `False`.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@backend/app/core/adaptive_sampling.py`:
- Around line 55-57: The sliding-window deques _request_window and _error_window
were created with maxlen=60 which caps request_rate to 60 and makes the
high-traffic branch in _adjust_sampling_rate effectively dead; remove the maxlen
parameter (or set it to a value >= high_traffic_threshold) so the time-based
pruning in _calculate_metrics can correctly compute request_rate over the
1-minute window and enable high-traffic detection.

In `@backend/app/services/event_replay/replay_service.py`:
- Around line 189-204: The bug double-increments session.failed_events when
_replay_event raises and session.config.skip_errors is True; fix by
restructuring the try/except so accounting happens only once: in replay loop
catch exceptions from _replay_event (append ReplayError to session.errors and
increment session.failed_events), and if not session.config.skip_errors call
await self._finalize_session(session, ReplayStatus.FAILED) and return; otherwise
skip the normal success/failed accounting (e.g. use try/except/else or continue)
so that the later block that inspects the success variable (and increments
session.failed_events or session.replayed_events) only runs for successful,
non-exceptional attempts. Ensure references: _replay_event,
session.failed_events, session.errors, session.config.skip_errors,
_finalize_session, ReplayStatus.FAILED.

In `@backend/app/services/notification_service.py`:
- Around line 694-699: The metrics calls in notification_service.py are passing
notification.severity where notification_type is expected; update the calls to
derive notification_type = notification.tags[0] if present else "unknown" and
pass that as the first argument to self.metrics.record_notification_sent(...)
and self.metrics.record_notification_delivery_time(...), keeping
channel=notification.channel and severity=notification.severity for
record_notification_sent and using delivery_time as the duration argument for
record_notification_delivery_time.
- Around line 664-668: The notification.max_retries field must be constrained to
be >=1 to avoid backoff.on_exception getting max_tries=0; update the Pydantic
model that defines max_retries (e.g., the Notification or NotificationConfig
class and its max_retries attribute) to use a constrained field such as Field(3,
ge=1) or conint(ge=1) so validation rejects 0, and ensure any default remains 3;
this prevents passing 0 into backoff.on_exception (referenced where
backoff.on_exception(..., max_tries=notification.max_retries) is used).

In `@backend/tests/unit/core/test_adaptive_sampling.py`:
- Around line 29-49: The high-traffic part of
test_adjust_sampling_rate_error_and_traffic never triggers because
AdaptiveSampler._request_window has maxlen=60 so appending 2000 entries still
yields at most 60 requests and won't exceed the default high_traffic_threshold;
fix the test by constructing the sampler with a lower threshold (e.g., use
AdaptiveSampler(base_rate=0.1, adjustment_interval=1,
high_traffic_threshold=50)) or otherwise set s._high_traffic_threshold to a
value <=60 before appending, then re-run the same append/adjust/assert sequence
to actually exercise the high-traffic branch of _adjust_sampling_rate.
🧹 Nitpick comments (6)
backend/app/services/notification_service.py (1)

664-676: jitter=None disables jitter on exponential backoff for external service calls.

For webhook and Slack deliveries, correlated retries without jitter can cause synchronized retry storms against the same external endpoint. Consider using backoff.full_jitter (the default) instead of None.

Proposed fix
         `@backoff.on_exception`(
             backoff.expo,
             Exception,
             max_tries=notification.max_retries,
             max_value=30,
-            jitter=None,
+            jitter=backoff.full_jitter,
             on_backoff=lambda details: self.logger.warning(
backend/app/core/adaptive_sampling.py (1)

99-103: Thread-safety consideration after removing locks.

should_sample() is invoked by the OpenTelemetry SDK, potentially from multiple threads. The removed threading lock previously protected shared state. While CPython's GIL makes individual deque.append/popleft operations atomic, compound operations in _calculate_metrics (iterating + popping) interleaved with append in should_sample could yield approximate results.

Given that sampling is inherently probabilistic, this is likely acceptable — but worth documenting the design choice.

backend/tests/unit/core/test_adaptive_sampling.py (1)

60-62: monkeypatch.setenv on Line 60 has no effect here.

create_adaptive_sampler reads from the mock_settings object (whose TRACING_SAMPLING_RATE is set to 0.2 on Line 58), not from environment variables. The setenv call is dead code.

Proposed fix
-    monkeypatch.setenv("TRACING_SAMPLING_RATE", "0.2")
     sampler = create_adaptive_sampler(mock_settings)
backend/app/services/event_replay/replay_service.py (3)

82-92: One AsyncIOScheduler per session is heavyweight.

Each replay session creates its own APScheduler AsyncIOScheduler instance. Under load with many concurrent sessions, this could create significant overhead (each scheduler runs its own internal event loop machinery). Consider using a single shared scheduler with per-session job IDs, or a simpler approach like asyncio.call_later / loop.call_later.


248-249: Prefer anext() builtin over __anext__().

anext(batch_iter) is the idiomatic Python 3.10+ way to advance an async iterator.

Proposed fix
-            batch = await batch_iter.__anext__()
+            batch = await anext(batch_iter)

319-324: Same note as notification_service.py: jitter=None disables jitter on retry backoff.

For Kafka produce retries, correlated retry storms are less of a concern than with external webhooks, but adding jitter is still a best practice.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/event_replay/replay_service.py (1)

136-143: ⚠️ Potential issue | 🟠 Major

Active-replay counter can go negative; redundant DB update.

cancel_session has no status guard, so cancelling a CREATED or already-completed session calls _finalize_sessiondecrement_active_replays() (line 262) without a prior increment_active_replays(). This drifts the gauge negative.

Additionally, _finalize_session already persists the session via _update_session_in_db (line 263), then line 140 calls update_session_status again — a redundant DB write.

Proposed fix
 async def cancel_session(self, session_id: str) -> ReplayOperationResult:
     session = self.get_session(session_id)
+    if session.status in {ReplayStatus.COMPLETED, ReplayStatus.FAILED, ReplayStatus.CANCELLED}:
+        raise ReplayOperationError(session_id, "cancel", "Session is already finished")
     session.status = ReplayStatus.CANCELLED
     await self._finalize_session(session, ReplayStatus.CANCELLED)
-    await self._repository.update_session_status(session_id, ReplayStatus.CANCELLED)
     return ReplayOperationResult(
         session_id=session_id, status=ReplayStatus.CANCELLED, message="Replay session cancelled"
     )

And in _finalize_session, guard the decrement:

-    self._metrics.decrement_active_replays()
+    if final_status != ReplayStatus.CANCELLED or session.started_at is not None:
+        self._metrics.decrement_active_replays()

Or more robustly, only decrement if the session was previously RUNNING (track an _active_session_ids set).

🤖 Fix all issues with AI agents
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 82-97: The dispatch job can race because the scheduler is started
and the job added while session.status is still CREATED; move the status
transition so that session.status = ReplayStatus.RUNNING, session.started_at =
datetime.now(timezone.utc), and await
self._repository.update_session_status(session_id, ReplayStatus.RUNNING) happen
before creating/starting the AsyncIOScheduler and calling
scheduler.add_job(self._dispatch_next, ...); keep self._schedulers[session_id]
assignment and self._metrics.increment_active_replays() adjacent to the status
update (either before or after the scheduler start) so _dispatch_next will
always observe the RUNNING state when it first runs.
- Around line 319-347: The backoff decorator on the _dispatch function currently
retries on all Exception types (backoff.on_exception(..., Exception,...)),
causing retries for non-transient errors like the ValueError raised for unknown
ReplayTarget; change the exception tuple to only include transient errors from
the Kafka producer and file I/O (e.g., the specific producer exception class(s)
you use and OSError/IOError) so ValueError and other non-retryable exceptions
(raised in the ReplayTarget._dispatch default case) are not retried, and change
the on_backoff logger call from self.logger.error to self.logger.warning to
lower the log severity for expected retry attempts. Ensure references to
self._producer.produce and _write_event_to_file remain inside _dispatch and that
the ValueError path still raises immediately.
🧹 Nitpick comments (4)
backend/app/services/event_replay/replay_service.py (4)

38-42: Consider consolidating per-session state into a single dataclass.

Five parallel dictionaries (_sessions, _schedulers, _batch_iters, _event_buffers, _buffer_indices) keyed by session_id are easy to get out of sync. A single dict[str, SessionRuntime] (where SessionRuntime bundles the scheduler, iterator, buffer, and index) would make cleanup less error-prone and easier to reason about.


248-249: Prefer anext() builtin over .__anext__().

Python 3.10+ provides anext() as the async counterpart to next(). It's more idiomatic and reads more clearly.

Proposed fix
-            batch = await batch_iter.__anext__()
+            batch = await anext(batch_iter)

358-369: DB update failures are silently swallowed — in-memory state will diverge.

_update_session_in_db logs the error but doesn't propagate it. If the DB is intermittently unreachable, the in-memory session state (event counts, status) will drift from the persisted state with no indication to the caller. Consider at minimum recording a metric for DB update failures so they're observable.


82-84: One AsyncIOScheduler per session is heavyweight at scale.

Each AsyncIOScheduler instance spins up its own thread pool. With many concurrent replay sessions, this linearly scales thread count. A single shared scheduler (class-level) with per-session job IDs (already namespaced as dispatch_{session_id}) would be more resource-efficient and easier to manage.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 7, 2026

@HardMax71 HardMax71 merged commit cc1564e into main Feb 7, 2026
15 checks passed
@HardMax71 HardMax71 deleted the fix/asyncio-sleep branch February 7, 2026 12:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant