Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
988fa6e
Add sync streaming support for Anthropic instrumentation
vasantteja Feb 1, 2026
ea0bd94
Add changelog entry for sync streaming support
vasantteja Feb 1, 2026
504d0df
Fix type checking errors with type: ignore comments
vasantteja Feb 1, 2026
8df752a
Refactor Anthropic instrumentation to improve usage tracking and erro…
vasantteja Feb 9, 2026
1b09377
Refactor utility functions and test cases for improved readability an…
vasantteja Feb 9, 2026
e6c83ac
Refactor argument handling in assert_span_attributes function
vasantteja Feb 9, 2026
1ed3c6b
Enhance tests for streaming message handling in Anthropic instrumenta…
vasantteja Feb 9, 2026
a011520
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 9, 2026
2851e4a
Update test_sync_messages.py to disable pylint warning for too-many-l…
vasantteja Feb 9, 2026
3e5cbda
Merge branch 'anthropic-sync-streaming' of https://github.com/vasantt…
vasantteja Feb 9, 2026
38d4429
Enhance StreamWrapper and MessageStreamManagerWrapper for idempotent …
vasantteja Feb 10, 2026
685e161
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 10, 2026
0f481c1
Enhance Anthropic instrumentation to support content capture
vasantteja Feb 11, 2026
75fcb3b
Enhance tests for sync message creation in Anthropic instrumentation
vasantteja Feb 11, 2026
8b5b20f
Remove sensitive 'anthropic-organization-id' headers from test casset…
vasantteja Feb 11, 2026
d48c7e3
Refactor tests for sync message handling in Anthropic instrumentation
vasantteja Feb 11, 2026
ee173da
Refactor utils.py for improved type safety and clarity
vasantteja Feb 12, 2026
ed46be8
Enhance Anthropic instrumentation tests for EVENT_ONLY content capture
vasantteja Feb 12, 2026
d1af778
Refactor assertion in sync messages test for clarity
vasantteja Feb 12, 2026
5093cbe
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 13, 2026
401e1b1
Refactor content capture logic and enhance streaming tests for Anthro…
vasantteja Feb 13, 2026
15d1b05
unsetting the model.
vasantteja Feb 13, 2026
44b97a8
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 16, 2026
7800a0e
Remove instrumentation for Messages.stream() and refactor related cod…
vasantteja Feb 16, 2026
df9911e
Refactor Anthropic instrumentation: reorganize imports, enhance utili…
vasantteja Feb 17, 2026
2590274
Add message extractors for Anthropic instrumentation.
vasantteja Feb 17, 2026
b4adeec
Refactor message extractors in Anthropic instrumentation: reorganize …
vasantteja Feb 17, 2026
e9c235a
Update test cassettes for Anthropic instrumentation: streamline reque…
vasantteja Feb 18, 2026
da275d0
Enhance Anthropic instrumentation: update MessageWrapper and StreamWr…
vasantteja Feb 19, 2026
2c2a780
Update test cassettes for Anthropic instrumentation: modify message I…
vasantteja Feb 19, 2026
cdd95b1
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 19, 2026
1d1b7f5
Rename StreamWrapper to MessagesStreamWrapper and update references i…
vasantteja Feb 24, 2026
0793c46
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 24, 2026
5effa69
Refactor type annotations in message extractors and wrappers for impr…
vasantteja Feb 25, 2026
89704d2
Merge branch 'anthropic-sync-streaming' of https://github.com/vasantt…
vasantteja Feb 25, 2026
a835a75
Enhance type annotations in message extractors and patch for improved…
vasantteja Feb 25, 2026
471204c
Enhance type safety and error handling in message processing. Update …
vasantteja Feb 26, 2026
d1bf3fe
Refactor assertions in test_sync_messages.py for improved readability…
vasantteja Feb 26, 2026
138bfa0
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 26, 2026
cc0a5b0
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 26, 2026
0dc8a9f
enforce strong typing system.
vasantteja Feb 26, 2026
f62dd6d
Update anthropic dependency version to 0.51.0 in pyproject.toml and r…
vasantteja Feb 26, 2026
1bf7695
Refactor usage token extraction to utilize a new UsageTokens dataclas…
vasantteja Feb 27, 2026
3d2bd63
Update anthropic dependency version in uv.lock to 0.51.0 for compatib…
vasantteja Feb 27, 2026
cc8e4d1
Add tests for should_capture_content function in test_events_options.py.
vasantteja Feb 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add sync streaming support for `Messages.create(stream=True)` and `Messages.stream()`
([#4155](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4155))
- `StreamWrapper` for handling `Messages.create(stream=True)` telemetry
- `MessageStreamManagerWrapper` for handling `Messages.stream()` telemetry
- `MessageWrapper` for non-streaming response telemetry extraction
- Initial implementation of Anthropic instrumentation
([#3978](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3978))
- Implement sync `Messages.create` instrumentation with GenAI semantic convention attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [

[project.optional-dependencies]
instruments = [
"anthropic >= 0.16.0",
"anthropic >= 0.51.0",
]

[project.entry-points.opentelemetry_instrumentor]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
)

from opentelemetry.instrumentation.anthropic.package import _instruments
from opentelemetry.instrumentation.anthropic.patch import messages_create
from opentelemetry.instrumentation.anthropic.patch import (
messages_create,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.util.genai.handler import TelemetryHandler
Expand Down Expand Up @@ -89,11 +91,12 @@ def _instrument(self, **kwargs: Any) -> None:
# Get providers from kwargs
tracer_provider = kwargs.get("tracer_provider")
meter_provider = kwargs.get("meter_provider")
logger_provider = kwargs.get("logger_provider")

# TODO: Add logger_provider to TelemetryHandler to capture content events.
handler = TelemetryHandler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
logger_provider=logger_provider,
)

# Patch Messages.create
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Get/extract helpers for Anthropic Messages instrumentation."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Sequence

from anthropic.types import MessageDeltaUsage

from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.semconv._incubating.attributes import (
server_attributes as ServerAttributes,
)
from opentelemetry.util.genai.types import (
InputMessage,
MessagePart,
OutputMessage,
)
from opentelemetry.util.types import AttributeValue

from .utils import (
convert_content_to_parts,
normalize_finish_reason,
)

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping

import httpx
from anthropic.resources.messages import Messages
from anthropic.types import (
Message,
MessageParam,
MetadataParam,
TextBlockParam,
ThinkingConfigParam,
ToolChoiceParam,
ToolUnionParam,
Usage,
)


@dataclass
class MessageRequestParams:
model: str | None = None
max_tokens: int | None = None
temperature: float | None = None
top_k: int | None = None
top_p: float | None = None
stop_sequences: Sequence[str] | None = None
stream: bool | None = None
messages: Iterable[MessageParam] | None = None
system: str | Iterable[TextBlockParam] | None = None


GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS = (
"gen_ai.usage.cache_creation.input_tokens"
)
GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens"


@dataclass
class UsageTokens:
input_tokens: int | None = None
output_tokens: int | None = None
cache_creation_input_tokens: int | None = None
cache_read_input_tokens: int | None = None


def extract_usage_tokens(
usage: Usage | MessageDeltaUsage | None,
) -> UsageTokens:
if usage is None:
return UsageTokens()

input_tokens = usage.input_tokens
output_tokens = usage.output_tokens
cache_creation_input_tokens = usage.cache_creation_input_tokens
cache_read_input_tokens = usage.cache_read_input_tokens

if (
input_tokens is None
and cache_creation_input_tokens is None
and cache_read_input_tokens is None
):
total_input_tokens = None
else:
total_input_tokens = (
(input_tokens or 0)
+ (cache_creation_input_tokens or 0)
+ (cache_read_input_tokens or 0)
)

return UsageTokens(
input_tokens=total_input_tokens,
output_tokens=output_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
)


def get_input_messages(
messages: Iterable[MessageParam] | None,
) -> list[InputMessage]:
if messages is None:
return []
result: list[InputMessage] = []
for message in messages:
role = message["role"]
parts = convert_content_to_parts(message["content"])
result.append(InputMessage(role=role, parts=parts))
return result


def get_system_instruction(
system: str | Iterable[TextBlockParam] | None,
) -> list[MessagePart]:
if system is None:
return []
return convert_content_to_parts(system)


def get_output_messages_from_message(
message: Message | None,
) -> list[OutputMessage]:
if message is None:
return []

parts = convert_content_to_parts(message.content)
finish_reason = normalize_finish_reason(message.stop_reason)
return [
OutputMessage(
role=message.role,
parts=parts,
finish_reason=finish_reason or "",
)
]


def extract_params( # pylint: disable=too-many-locals
*,
max_tokens: int | None = None,
messages: Iterable[MessageParam] | None = None,
model: str | None = None,
metadata: MetadataParam | None = None,
service_tier: str | None = None,
stop_sequences: Sequence[str] | None = None,
stream: bool | None = None,
system: str | Iterable[TextBlockParam] | None = None,
temperature: float | None = None,
thinking: ThinkingConfigParam | None = None,
tool_choice: ToolChoiceParam | None = None,
tools: Iterable[ToolUnionParam] | None = None,
top_k: int | None = None,
top_p: float | None = None,
extra_headers: Mapping[str, str] | None = None,
extra_query: Mapping[str, object] | None = None,
extra_body: object | None = None,
timeout: float | httpx.Timeout | None = None,
**_kwargs: object,
) -> MessageRequestParams:
return MessageRequestParams(
model=model,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
stop_sequences=stop_sequences,
stream=stream,
messages=messages,
system=system,
)


def _set_server_address_and_port(
client_instance: "Messages",
attributes: dict[str, AttributeValue | None],
) -> None:
base_url = client_instance._client.base_url
host = base_url.host
if host:
attributes[ServerAttributes.SERVER_ADDRESS] = host

port = base_url.port
if port and port != 443 and port > 0:
attributes[ServerAttributes.SERVER_PORT] = port


def get_llm_request_attributes(
params: MessageRequestParams, client_instance: "Messages"
) -> dict[str, AttributeValue]:
attributes: dict[str, AttributeValue | None] = {
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value,
GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.ANTHROPIC.value, # pyright: ignore[reportDeprecated]
GenAIAttributes.GEN_AI_REQUEST_MODEL: params.model,
GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS: params.max_tokens,
GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE: params.temperature,
GenAIAttributes.GEN_AI_REQUEST_TOP_P: params.top_p,
GenAIAttributes.GEN_AI_REQUEST_TOP_K: params.top_k,
GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES: params.stop_sequences,
}
_set_server_address_and_port(client_instance, attributes)
return {k: v for k, v in attributes.items() if v is not None}
Loading