Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ OTel instrumentation is opt-in, controlled by environment variables:
| Variable | Description | Default |
| --------------------------------- | --------------------------------------------------------------------------------------------------------------------- | --------------- |
| `OTEL_EXPORTER_OTLP_ENDPOINT` | Base OTLP endpoint (e.g. `http://collector:4318`). If unset, no OTel setup occurs. | _(disabled)_ |
| `OTEL_SERVICE_NAME` | The `service.name` resource attribute. | `flagsmith-api` |
| `OTEL_SERVICE_NAME` | The `service.name` resource attribute. Defaults to `flagsmith-task-processor` when running the task processor. | `flagsmith-api` |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| `OTEL_TRACING_EXCLUDED_URL_PATHS` | Comma-separated URL paths to exclude from tracing (e.g. `health/liveness,health/readiness`). | _(none)_ |

Standard `OTEL_*` env vars (e.g. `OTEL_RESOURCE_ATTRIBUTES`, `OTEL_EXPORTER_OTLP_HEADERS`) are also respected by the OTel SDK.
Expand All @@ -121,6 +121,7 @@ When `OTEL_EXPORTER_OTLP_ENDPOINT` is set, `ensure_cli_env()` sets up:
- **psycopg2** (`Psycopg2Instrumentor`): creates child spans for each SQL query with `db.system`, `db.statement`, and `db.name` attributes. SQL commenter is enabled, adding trace context as SQL comments for database-side correlation.
- **Redis** (`RedisInstrumentor`): creates child spans for each Redis command with `db.system` and `db.statement` attributes.
- **Structured log export**: A structlog processor that emits each log event as both an OTLP log record and a span event (when an active span exists).
- **Task processor trace propagation**: When a task is enqueued via `TaskHandler.delay()`, the current W3C trace context (including baggage) is serialized into the task's `trace_context` field. When the task processor executes the task, the context is extracted and a child span is created, linking the task execution back to the originating request trace. Span attributes (`task_identifier`, `task_type`, `result`) match the Prometheus metric labels for cross-signal correlation.

#### Emitting OTel log events via structlog

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ optional-dependencies = { test-tools = [
"backoff (>=2.2.1,<3.0.0)",
"django (>4,<6)",
"django-health-check",
"opentelemetry-api (>=1.25,<2)",
"prometheus-client (>=0.0.16)",
], flagsmith-schemas = [
"simplejson",
Expand Down
7 changes: 6 additions & 1 deletion src/common/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ def ensure_cli_env() -> typing.Generator[None, None, None]:
setup_tracing,
)

service_name = env.str("OTEL_SERVICE_NAME", "flagsmith-api")
default_service_name = (
"flagsmith-task-processor"
if "task-processor" in sys.argv
else "flagsmith-api"
)
Comment on lines +69 to +73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Should we just set OTEL_SERVICE_NAME in container settings? I'm tempted to the smaller effort this allows, but it got my eyebrow raised.

service_name = env.str("OTEL_SERVICE_NAME", default_service_name)
log_provider = build_otel_log_provider(
endpoint=f"{otel_endpoint}/v1/logs",
service_name=service_name,
Expand Down
2 changes: 1 addition & 1 deletion src/common/test_tools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __call__(
class RunTasksFixture(Protocol):
def __call__(
self,
num_tasks: int,
num_tasks: int = 1,
) -> "list[TaskRun]": ...


Expand Down
6 changes: 5 additions & 1 deletion src/task_processor/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from django.conf import settings
from django.db.transaction import on_commit
from django.utils import timezone
from opentelemetry import propagate

from task_processor import metrics, task_registry
from task_processor.exceptions import InvalidArgumentsError, TaskQueueFullError
from task_processor.models import RecurringTask, Task, TaskPriority
from task_processor.task_run_method import TaskRunMethod
from task_processor.types import TaskCallable, TaskParameters
from task_processor.types import TaskCallable, TaskParameters, TraceContext
from task_processor.utils import get_task_identifier_from_function

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -92,6 +93,8 @@ def delay(
task_identifier=task_identifier
).inc()
try:
carrier: TraceContext = {}
propagate.inject(carrier)
task = Task.create(
task_identifier=task_identifier,
scheduled_for=delay_until or timezone.now(),
Expand All @@ -100,6 +103,7 @@ def delay(
timeout=self.timeout,
args=args,
kwargs=kwargs,
trace_context=carrier or None,
)
except TaskQueueFullError as e:
logger.warning(e)
Expand Down
23 changes: 23 additions & 0 deletions src/task_processor/migrations/0014_add_trace_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 5.2.12 on 2026-04-10 14:29

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("task_processor", "0013_add_last_picked_at"),
]

operations = [
migrations.AddField(
model_name="recurringtask",
name="trace_context",
field=models.JSONField(blank=True, null=True),
),
migrations.AddField(
model_name="task",
name="trace_context",
field=models.JSONField(blank=True, null=True),
),
]
5 changes: 4 additions & 1 deletion src/task_processor/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from task_processor.exceptions import TaskQueueFullError
from task_processor.managers import RecurringTaskManager, TaskManager
from task_processor.task_registry import get_task, registered_tasks
from task_processor.types import TaskCallable
from task_processor.types import TaskCallable, TraceContext

_django_json_encoder_default = DjangoJSONEncoder().default

Expand All @@ -31,6 +31,7 @@ class AbstractBaseTask(models.Model):
serialized_kwargs = models.TextField(blank=True, null=True)
is_locked = models.BooleanField(default=False)
timeout = models.DurationField(blank=True, null=True)
trace_context = models.JSONField(null=True, blank=True)

class Meta:
abstract = True
Expand Down Expand Up @@ -112,6 +113,7 @@ def create(
args: typing.Tuple[typing.Any, ...] | None = None,
kwargs: typing.Dict[str, typing.Any] | None = None,
timeout: timedelta | None = timedelta(seconds=60),
trace_context: TraceContext | None = None,
) -> "Task":
if queue_size and cls._is_queue_full(task_identifier, queue_size):
raise TaskQueueFullError(
Expand All @@ -125,6 +127,7 @@ def create(
serialized_args=cls.serialize_data(args or tuple()),
serialized_kwargs=cls.serialize_data(kwargs or dict()),
timeout=timeout,
trace_context=trace_context,
)

@classmethod
Expand Down
15 changes: 15 additions & 0 deletions src/task_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
from datetime import timedelta
from importlib.metadata import version

from django.conf import settings
from django.utils import timezone
from opentelemetry import context as otel_context
from opentelemetry import propagate, trace

from task_processor import metrics
from task_processor.exceptions import TaskBackoffError
Expand Down Expand Up @@ -130,6 +133,11 @@ def _run_task(
result: str
executor = None

extracted_ctx = propagate.extract(task.trace_context or {})
tracer = trace.get_tracer("task_processor", version("flagsmith-common"))
span = tracer.start_span(task_identifier, context=extracted_ctx)
otel_token = otel_context.attach(trace.set_span_in_context(span, extracted_ctx))

try:
# Use explicit executor management to avoid blocking on shutdown
# when tasks timeout but continue running in worker threads.
Expand All @@ -151,6 +159,9 @@ def _run_task(
# fall back to using repr.
err_msg = str(e) or repr(e)

span.set_status(trace.StatusCode.ERROR, err_msg)
span.record_exception(e)

task.mark_failure()

task_run.result = result = TaskResult.FAILURE.value
Expand Down Expand Up @@ -199,4 +210,8 @@ def _run_task(

metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc()

span.set_attributes(labels)
span.end()
otel_context.detach(otel_token)

return task, task_run
2 changes: 2 additions & 0 deletions src/task_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

TaskCallable: TypeAlias = Callable[TaskParameters, None]

TraceContext: TypeAlias = dict[str, str]


@dataclass
class TaskProcessorConfig:
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def otel_tracing() -> Generator[InMemorySpanExporter, None, None]:
yield exporter


@pytest.fixture()
def span_exporter(otel_tracing: InMemorySpanExporter) -> InMemorySpanExporter:
otel_tracing.clear()
return otel_tracing


@pytest.fixture(scope="session")
def test_metric() -> prometheus_client.Counter:
return prometheus_client.Counter(
Expand Down
6 changes: 0 additions & 6 deletions tests/integration/core/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ def setup_logging_fixture(
structlog.reset_defaults()


@pytest.fixture()
def span_exporter(otel_tracing: InMemorySpanExporter) -> InMemorySpanExporter:
otel_tracing.clear()
return otel_tracing


def test_structlog_otel_log_record__basic_event__body_event_name_severity_attributes(
log_exporter: InMemoryLogExporter,
) -> None:
Expand Down
67 changes: 67 additions & 0 deletions tests/unit/common/core/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,70 @@ def test_ensure_cli_env__task_processor_in_argv__sets_run_by_processor(
# When / Then
with ensure_cli_env():
assert os.environ.get("RUN_BY_PROCESSOR") == "true"


def test_ensure_cli_env__task_processor__expected_otel_service_name(
monkeypatch: pytest.MonkeyPatch,
mocker: MockerFixture,
) -> None:
# Given
monkeypatch.setattr("sys.argv", ["flagsmith", "task-processor"])
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://collector:4318")

mock_build_log = mocker.patch(
"common.core.otel.build_otel_log_provider",
return_value=mocker.MagicMock(spec=LoggerProvider),
)
mock_build_tracer = mocker.patch(
"common.core.otel.build_tracer_provider",
return_value=mocker.MagicMock(spec=TracerProvider),
)
mocker.patch("common.core.otel.setup_tracing")

# When
with ensure_cli_env():
pass

# Then
mock_build_log.assert_called_once_with(
endpoint="http://collector:4318/v1/logs",
service_name="flagsmith-task-processor",
)
mock_build_tracer.assert_called_once_with(
endpoint="http://collector:4318/v1/traces",
service_name="flagsmith-task-processor",
)
Comment on lines +173 to +203
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how the "flagsmith-api" case isn't caught by tests coverage.

Please, either:

  1. Fix coverage;
  2. Consider deleting default_service_name, and this test — see here. IMO test_ensure_cli_env__env_service_name__expected_otel_service_name looks sufficient.



def test_ensure_cli_env__env_service_name__expected_otel_service_name(
monkeypatch: pytest.MonkeyPatch,
mocker: MockerFixture,
) -> None:
# Given
monkeypatch.setattr("sys.argv", ["flagsmith", "task-processor"])
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://collector:4318")
monkeypatch.setenv("OTEL_SERVICE_NAME", "my-custom")

mock_build_log = mocker.patch(
"common.core.otel.build_otel_log_provider",
return_value=mocker.MagicMock(spec=LoggerProvider),
)
mock_build_tracer = mocker.patch(
"common.core.otel.build_tracer_provider",
return_value=mocker.MagicMock(spec=TracerProvider),
)
mocker.patch("common.core.otel.setup_tracing")

# When
with ensure_cli_env():
pass

# Then
mock_build_log.assert_called_once_with(
endpoint="http://collector:4318/v1/logs",
service_name="my-custom",
)
mock_build_tracer.assert_called_once_with(
endpoint="http://collector:4318/v1/traces",
service_name="my-custom",
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest.mock import MagicMock

import pytest
from opentelemetry import baggage, context, trace
from pytest_django import DjangoCaptureOnCommitCallbacks
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture
Expand Down Expand Up @@ -252,3 +253,56 @@ def my_function(*args: typing.Any, **kwargs: typing.Any) -> None:
# Then
assert task
assert task.priority == TaskPriority.HIGH


@pytest.mark.django_db
def test_delay__active_trace__persists_trace_context_on_task() -> None:
# Given
@register_task_handler()
def my_function() -> None: ...

tracer = trace.get_tracer("test")

# When
with tracer.start_as_current_span("test-request"):
task = my_function.delay()

# Then
assert task is not None
assert task.trace_context is not None
assert "traceparent" in task.trace_context


@pytest.mark.django_db
def test_delay__no_active_trace__persists_empty_trace_context() -> None:
# Given
@register_task_handler()
def my_function() -> None: ...

# When
task = my_function.delay()

# Then
assert task is not None
assert task.trace_context is None


@pytest.mark.django_db
def test_delay__baggage__persists_baggage_in_trace_context() -> None:
# Given
@register_task_handler()
def my_function() -> None: ...

tracer = trace.get_tracer("test")
ctx = baggage.set_baggage("amplitude.device_id", "device-123")
context.attach(ctx)

# When
with tracer.start_as_current_span("test-request"):
task = my_function.delay()

# Then
assert task is not None
assert task.trace_context is not None
assert "baggage" in task.trace_context
assert "amplitude.device_id=device-123" in task.trace_context["baggage"]
27 changes: 27 additions & 0 deletions tests/unit/task_processor/test_unit_task_processor_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,30 @@ def test_recurring_task_should_execute__first_run_time_before_midnight__returns_

# When & Then
assert task.should_execute is True


@pytest.mark.parametrize(
"trace_context",
[
pytest.param(
{"traceparent": "00-abcdef-123456-01", "baggage": "key=val"},
id="with_trace_context",
),
pytest.param(None, id="without_trace_context"),
],
)
@pytest.mark.django_db
def test_task_create__trace_context__persists_expected(
trace_context: dict[str, str] | None,
) -> None:
# Given / When
task = Task.create(
task_identifier="test_task",
scheduled_for=timezone.now(),
trace_context=trace_context,
)
task.save()

# Then
task.refresh_from_db()
assert task.trace_context == trace_context
Loading
Loading