Skip to content
Open
14 changes: 14 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
is_known_llm_instrumentor,
is_langfuse_span,
)
from .types import (
MaskOtelSpansFunction,
MaskOtelSpansParams,
MaskOtelSpansResult,
OtelSpanData,
OtelSpanIdentifier,
OtelSpanPatch,
)

Langfuse = _client_module.Langfuse

Expand Down Expand Up @@ -71,6 +79,12 @@
"is_genai_span",
"is_known_llm_instrumentor",
"KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES",
"MaskOtelSpansFunction",
"MaskOtelSpansParams",
"MaskOtelSpansResult",
"OtelSpanData",
"OtelSpanIdentifier",
"OtelSpanPatch",
"experiment",
"api",
]
45 changes: 43 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@
PromptClient,
TextPromptClient,
)
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext
from langfuse.types import (
MaskFunction,
MaskOtelSpansFunction,
ScoreDataType,
SpanLevel,
TraceContext,
)


class Langfuse:
Expand Down Expand Up @@ -169,7 +175,40 @@ class Langfuse:
release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release.
media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
mask (Optional[MaskFunction]): Function to mask sensitive data synchronously when Langfuse SDK attributes are created. This applies only to data set through Langfuse SDK APIs such as `start_observation()`, `update()`, and `set_trace_io()`.
mask_otel_spans (Optional[MaskOtelSpansFunction]): Synchronous export-stage hook for masking raw OpenTelemetry span attributes before this Langfuse client sends them to Langfuse. Use this for spans created by third-party OpenTelemetry instrumentations, or when you need to inspect final span attributes after export filtering and Langfuse media handling. It does not modify spans already exported through other OpenTelemetry exporters.

The hook receives one OpenTelemetry export batch. A batch is not guaranteed to contain a complete trace, request, or Langfuse observation tree. The hook usually runs on the OpenTelemetry batch span processor worker thread; during `flush()` and shutdown it may run on the caller thread. Keep it synchronous, deterministic, and fast.

Return `None` to leave the batch unchanged. Return `MaskOtelSpansResult` with `OtelSpanPatch` values to delete or replace attributes on selected spans. If the hook raises or returns an invalid batch result, Langfuse drops the whole export batch. If one returned span patch is invalid, Langfuse drops only that span from the Langfuse export.

Example:
```python
from typing import Optional

from langfuse import Langfuse
from langfuse.types import (
MaskOtelSpansParams,
MaskOtelSpansResult,
OtelSpanPatch,
)

def mask_otel_spans(
*, params: MaskOtelSpansParams
) -> Optional[MaskOtelSpansResult]:
patches = {}

for identifier, span in params.spans.items():
if "gen_ai.prompt.0.content" in span.attributes:
patches[identifier] = OtelSpanPatch(
delete_attributes=("gen_ai.prompt.0.content",),
set_attributes={"masking.applied": True},
)

return MaskOtelSpansResult(span_patches=patches)

langfuse = Langfuse(mask_otel_spans=mask_otel_spans)
```
blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior:
```python
from langfuse.span_filter import is_default_export_span
Expand Down Expand Up @@ -246,6 +285,7 @@ def __init__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -342,6 +382,7 @@ def __init__(
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
mask_otel_spans=mask_otel_spans,
tracing_enabled=self._tracing_enabled,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
Expand Down
1 change: 1 addition & 0 deletions langfuse/_client/get_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def _create_client_from_instance(
media_upload_thread_count=instance.media_upload_thread_count,
sample_rate=instance.sample_rate,
mask=instance.mask,
mask_otel_spans=instance.mask_otel_spans,
blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes,
should_export_span=instance.should_export_span,
additional_headers=instance.additional_headers,
Expand Down
62 changes: 34 additions & 28 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from langfuse._utils.request import LangfuseClient
from langfuse.api import AsyncLangfuseAPI, LangfuseAPI
from langfuse.logger import langfuse_logger
from langfuse.types import MaskFunction
from langfuse.types import MaskFunction, MaskOtelSpansFunction

from .._version import __version__ as langfuse_version

Expand Down Expand Up @@ -94,6 +94,7 @@ def __new__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
tracing_enabled: Optional[bool] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
Expand Down Expand Up @@ -128,6 +129,7 @@ def __new__(
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
mask_otel_spans=mask_otel_spans,
tracing_enabled=tracing_enabled
if tracing_enabled is not None
else True,
Expand Down Expand Up @@ -157,6 +159,7 @@ def _initialize_instance(
httpx_client: Optional[httpx.Client] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
tracing_enabled: bool = True,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
Expand All @@ -169,6 +172,7 @@ def _initialize_instance(
self.tracing_enabled = tracing_enabled
self.base_url = base_url
self.mask = mask
self.mask_otel_spans = mask_otel_spans
self.environment = environment

# Store additional client settings for get_client() to use
Expand All @@ -184,33 +188,6 @@ def _initialize_instance(
self.span_exporter = span_exporter
self.tracer_provider: Optional[TracerProvider] = None

# OTEL Tracer
if tracing_enabled:
tracer_provider = tracer_provider or _init_tracer_provider(
environment=environment, release=release, sample_rate=sample_rate
)
self.tracer_provider = tracer_provider

langfuse_processor = LangfuseSpanProcessor(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
timeout=timeout,
flush_at=flush_at,
flush_interval=flush_interval,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
additional_headers=additional_headers,
span_exporter=span_exporter,
)
tracer_provider.add_span_processor(langfuse_processor)

self._otel_tracer = tracer_provider.get_tracer(
LANGFUSE_TRACER_NAME,
langfuse_version,
attributes={"public_key": self.public_key},
)

# API Clients

## API clients must be singletons because the underlying HTTPX clients
Expand Down Expand Up @@ -266,6 +243,35 @@ def _initialize_instance(
)
self._media_upload_consumers = []

# OTEL Tracer
if tracing_enabled:
tracer_provider = tracer_provider or _init_tracer_provider(
environment=environment, release=release, sample_rate=sample_rate
)
self.tracer_provider = tracer_provider

langfuse_processor = LangfuseSpanProcessor(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
timeout=timeout,
flush_at=flush_at,
flush_interval=flush_interval,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
additional_headers=additional_headers,
span_exporter=span_exporter,
media_manager=self._media_manager,
mask_otel_spans=mask_otel_spans,
)
tracer_provider.add_span_processor(langfuse_processor)

self._otel_tracer = tracer_provider.get_tracer(
LANGFUSE_TRACER_NAME,
langfuse_version,
attributes={"public_key": self.public_key},
)

media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)
Expand Down
Loading
Loading