Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 92 additions & 50 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,10 @@ def _set_output_data(
)


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
def _sentry_patched_create_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
"""
Creates and manages an AI Client Span for both non-streaming and streaming calls.
"""
integration = kwargs.pop("integration")
if integration is None:
return f(*args, **kwargs)
Expand All @@ -624,7 +627,13 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

_set_create_input_data(span, kwargs, integration)

result = yield f, args, kwargs
try:
result = f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

if isinstance(result, Stream):
result._span = span
Expand All @@ -638,6 +647,81 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
input_tokens,
output_tokens,
cache_read_input_tokens,
cache_write_input_tokens,
) = _get_token_usage(result)

content_blocks = []
for content_block in result.content:
if hasattr(content_block, "to_dict"):
content_blocks.append(content_block.to_dict())
elif hasattr(content_block, "model_dump"):
content_blocks.append(content_block.model_dump())
elif hasattr(content_block, "text"):
content_blocks.append({"type": "text", "text": content_block.text})

_set_output_data(
span=span,
integration=integration,
model=getattr(result, "model", None),
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_input_tokens=cache_read_input_tokens,
cache_write_input_tokens=cache_write_input_tokens,
content_blocks=content_blocks,
response_id=getattr(result, "id", None),
finish_reason=getattr(result, "stop_reason", None),
)
span.__exit__(None, None, None)
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result


async def _sentry_patched_create_async(
f: "Any", *args: "Any", **kwargs: "Any"
) -> "Any":
"""
Creates and manages an AI Client Span for both non-streaming and streaming calls.
"""
integration = kwargs.pop("integration")
if integration is None:
return await f(*args, **kwargs)

if "messages" not in kwargs:
return await f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return await f(*args, **kwargs)

model = kwargs.get("model", "")

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
name=f"chat {model}".strip(),
origin=AnthropicIntegration.origin,
)
span.__enter__()

_set_create_input_data(span, kwargs, integration)

try:
result = await f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

if isinstance(result, AsyncStream):
result._span = span
result._integration = integration
Expand Down Expand Up @@ -689,41 +773,20 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A


def _wrap_message_create(f: "Any") -> "Any":
def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return e.value

try:
try:
result = f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
def _sentry_wrapped_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
kwargs["integration"] = integration

try:
return _execute_sync(f, *args, **kwargs)
return _sentry_patched_create_sync(f, *args, **kwargs)
finally:
span = sentry_sdk.get_current_span()
if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
with capture_internal_exceptions():
span.__exit__(None, None, None)

return _sentry_patched_create_sync
return _sentry_wrapped_create_sync


def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None:
Expand Down Expand Up @@ -810,41 +873,20 @@ def close(self: "Union[Stream, MessageStream]") -> None:


def _wrap_message_create_async(f: "Any") -> "Any":
async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return await e.value

try:
try:
result = await f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
async def _sentry_wrapped_create_async(*args: "Any", **kwargs: "Any") -> "Any":
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
kwargs["integration"] = integration

try:
return await _execute_async(f, *args, **kwargs)
return await _sentry_patched_create_async(f, *args, **kwargs)
finally:
span = sentry_sdk.get_current_span()
if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
with capture_internal_exceptions():
span.__exit__(None, None, None)

return _sentry_patched_create_async
return _sentry_wrapped_create_async


def _wrap_async_close(
Expand Down
Loading