Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions drift/core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
self._condition = threading.Condition(self._lock)
self._shutdown_event = threading.Event()
self._export_thread: threading.Thread | None = None
self._thread_loop: asyncio.AbstractEventLoop | None = None
self._started = False
self._dropped_spans = 0

Expand Down Expand Up @@ -158,16 +159,23 @@ def add_span(self, span: CleanSpanData) -> bool:

def _export_loop(self) -> None:
"""Background thread that periodically exports spans."""
while not self._shutdown_event.is_set():
# Wait for either: batch size reached, scheduled delay, or shutdown
with self._condition:
# Wait until batch is ready or timeout
self._condition.wait(timeout=self._config.scheduled_delay_seconds)
# Create a single long-lived event loop for this thread
self._thread_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._thread_loop)

if self._shutdown_event.is_set():
break
try:
while not self._shutdown_event.is_set():
# Wait for either: batch size reached, scheduled delay, or shutdown
with self._condition:
self._condition.wait(timeout=self._config.scheduled_delay_seconds)

self._export_batch()
if self._shutdown_event.is_set():
break

self._export_batch()
finally:
self._thread_loop.close()
self._thread_loop = None

def _export_batch(self) -> None:
"""Export a batch of spans from the queue."""
Expand All @@ -188,14 +196,24 @@ def _export_batch(self) -> None:
for adapter in adapters:
start_time = time.monotonic()
try:
# Handle async adapters (create new event loop for this thread)
# Handle async adapters
if asyncio.iscoroutinefunction(adapter.export_spans):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(adapter.export_spans(batch))
finally:
loop.close()
# Only reuse the thread's event loop if we're on the export thread.
# Using it from another thread (e.g., force_flush after join timeout)
# would cause RuntimeError since event loops are not thread-safe.
is_export_thread = threading.current_thread() is self._export_thread
can_reuse_loop = (
is_export_thread and self._thread_loop is not None and not self._thread_loop.is_closed()
)
if can_reuse_loop and self._thread_loop is not None:
self._thread_loop.run_until_complete(adapter.export_spans(batch))
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(adapter.export_spans(batch))
finally:
loop.close()
else:
adapter.export_spans(batch)

Expand Down
204 changes: 204 additions & 0 deletions tests/unit/test_batch_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""Tests for BatchSpanProcessor event loop management."""

from __future__ import annotations

import asyncio
import time
from typing import Any

from drift.core.batch_processor import BatchSpanProcessor, BatchSpanProcessorConfig
from tests.utils.test_helpers import create_test_span


class MockExporter:
"""Mock exporter that returns configurable adapters.

Implements the minimal interface needed by BatchSpanProcessor.
"""

def __init__(self, adapters: list[Any]) -> None:
self._adapters = adapters

def get_adapters(self) -> list[Any]:
return self._adapters


def _create_processor(exporter: MockExporter, config: BatchSpanProcessorConfig) -> BatchSpanProcessor:
"""Create BatchSpanProcessor with mock exporter (type-safe wrapper)."""
# MockExporter implements get_adapters() which is all BatchSpanProcessor needs
return BatchSpanProcessor(exporter, config) # type: ignore[arg-type]


class TestBatchSpanProcessorEventLoop:
"""Test event loop reuse in BatchSpanProcessor."""

def test_reuses_event_loop_across_exports(self):
"""Verify the same event loop is reused for multiple exports."""
loops_used = []

class TrackingAdapter:
name = "tracking"

async def export_spans(self, spans):
loops_used.append(asyncio.get_event_loop())

adapter = TrackingAdapter()
exporter = MockExporter([adapter])

config = BatchSpanProcessorConfig(
scheduled_delay_seconds=0.1,
max_export_batch_size=1,
)
processor = _create_processor(exporter, config)
processor.start()

# Add spans to trigger multiple exports
processor.add_span(create_test_span(name="span-1"))
processor.add_span(create_test_span(name="span-2"))

time.sleep(0.3) # Allow exports to happen

processor.stop()

# All exports should use the same event loop
assert len(loops_used) >= 2, f"Expected at least 2 exports, got {len(loops_used)}"
assert all(loop is loops_used[0] for loop in loops_used), "Different event loops were used"

def test_force_flush_works_after_thread_shutdown(self):
"""Verify force_flush creates temporary loop when thread loop is closed."""
exported_spans = []

class CollectingAdapter:
name = "collecting"

async def export_spans(self, spans):
exported_spans.extend(spans)

adapter = CollectingAdapter()
exporter = MockExporter([adapter])

# Long delay so spans won't export before stop()
config = BatchSpanProcessorConfig(
scheduled_delay_seconds=10.0,
max_export_batch_size=100,
)
processor = _create_processor(exporter, config)
processor.start()

# Add spans
for i in range(5):
processor.add_span(create_test_span(name=f"span-{i}"))

# Stop immediately - force_flush should handle export
processor.stop()

assert len(exported_spans) == 5, f"Expected 5 spans, got {len(exported_spans)}"

def test_event_loop_closed_after_stop(self):
"""Verify the thread's event loop is properly cleaned up after stop."""

class NoOpAdapter:
name = "noop"

async def export_spans(self, spans):
pass

adapter = NoOpAdapter()
exporter = MockExporter([adapter])

config = BatchSpanProcessorConfig(scheduled_delay_seconds=0.1)
processor = _create_processor(exporter, config)

processor.start()
time.sleep(0.05) # Let thread start

# Thread loop should exist while running
assert processor._thread_loop is not None

processor.stop()

# Thread loop should be cleaned up after stop
assert processor._thread_loop is None

def test_sync_adapter_does_not_use_event_loop(self):
"""Verify sync adapters work without event loop involvement."""
exported_spans = []

class SyncAdapter:
name = "sync"

def export_spans(self, spans):
# Sync method - not a coroutine
exported_spans.extend(spans)

adapter = SyncAdapter()
exporter = MockExporter([adapter])

config = BatchSpanProcessorConfig(
scheduled_delay_seconds=0.1,
max_export_batch_size=5,
)
processor = _create_processor(exporter, config)
processor.start()

for i in range(3):
processor.add_span(create_test_span(name=f"span-{i}"))

time.sleep(0.2)
processor.stop()

assert len(exported_spans) == 3

def test_force_flush_from_different_thread_uses_temporary_loop(self):
"""Verify force_flush doesn't reuse export thread's loop from a different thread.

This tests the scenario where stop() times out and force_flush runs while
the export thread is still alive. Using the export thread's loop from
another thread would cause RuntimeError.
"""
loops_used = []
export_thread_loop = None

class TrackingAdapter:
name = "tracking"

async def export_spans(self, spans):
loops_used.append(asyncio.get_event_loop())

adapter = TrackingAdapter()

# Use a slow adapter that blocks the export thread
class SlowAdapter:
name = "slow"

async def export_spans(self, spans):
nonlocal export_thread_loop
export_thread_loop = asyncio.get_event_loop()
# Block for a while to simulate slow export
await asyncio.sleep(0.5)

slow_adapter = SlowAdapter()
exporter_with_slow = MockExporter([slow_adapter, adapter])

config = BatchSpanProcessorConfig(
scheduled_delay_seconds=0.05,
max_export_batch_size=1,
)
processor = _create_processor(exporter_with_slow, config)
processor.start()

# Add a span to trigger export
processor.add_span(create_test_span(name="span-1"))
time.sleep(0.1) # Let export start

# Stop with very short timeout - export thread will still be running
processor.stop(timeout=0.01)

# Add more spans and force flush from main thread
processor._queue.append(create_test_span(name="span-2"))
processor._force_flush()

# Verify: loops used during force_flush should NOT be the export thread's loop
# (because force_flush runs on the main thread)
main_thread_loops = [loop for loop in loops_used if loop is not export_thread_loop]
assert len(main_thread_loops) > 0, "force_flush should have used a different loop"