diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 40c1fa0635..7900870940 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -600,7 +600,10 @@ def _set_output_data( ) -def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": +def _sentry_patched_create_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": + """ + Creates and manages an AI Client Span for both non-streaming and streaming calls. + """ integration = kwargs.pop("integration") if integration is None: return f(*args, **kwargs) @@ -624,7 +627,13 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A _set_create_input_data(span, kwargs, integration) - result = yield f, args, kwargs + try: + result = f(*args, **kwargs) + except Exception as exc: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + _capture_exception(exc) + reraise(*exc_info) if isinstance(result, Stream): result._span = span @@ -638,6 +647,81 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A return result + with capture_internal_exceptions(): + if hasattr(result, "content"): + ( + input_tokens, + output_tokens, + cache_read_input_tokens, + cache_write_input_tokens, + ) = _get_token_usage(result) + + content_blocks = [] + for content_block in result.content: + if hasattr(content_block, "to_dict"): + content_blocks.append(content_block.to_dict()) + elif hasattr(content_block, "model_dump"): + content_blocks.append(content_block.model_dump()) + elif hasattr(content_block, "text"): + content_blocks.append({"type": "text", "text": content_block.text}) + + _set_output_data( + span=span, + integration=integration, + model=getattr(result, "model", None), + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_input_tokens=cache_read_input_tokens, + cache_write_input_tokens=cache_write_input_tokens, + content_blocks=content_blocks, + response_id=getattr(result, "id", None), + finish_reason=getattr(result, "stop_reason", None), + ) + span.__exit__(None, None, None) + else: + span.set_data("unknown_response", True) + span.__exit__(None, None, None) + + return result + + +async def _sentry_patched_create_async( + f: "Any", *args: "Any", **kwargs: "Any" +) -> "Any": + """ + Creates and manages an AI Client Span for both non-streaming and streaming calls. + """ + integration = kwargs.pop("integration") + if integration is None: + return await f(*args, **kwargs) + + if "messages" not in kwargs: + return await f(*args, **kwargs) + + try: + iter(kwargs["messages"]) + except TypeError: + return await f(*args, **kwargs) + + model = kwargs.get("model", "") + + span = get_start_span_function()( + op=OP.GEN_AI_CHAT, + name=f"chat {model}".strip(), + origin=AnthropicIntegration.origin, + ) + span.__enter__() + + _set_create_input_data(span, kwargs, integration) + + try: + result = await f(*args, **kwargs) + except Exception as exc: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + _capture_exception(exc) + reraise(*exc_info) + if isinstance(result, AsyncStream): result._span = span result._integration = integration @@ -689,41 +773,20 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A def _wrap_message_create(f: "Any") -> "Any": - def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": - gen = _sentry_patched_create_common(f, *args, **kwargs) - - try: - f, args, kwargs = next(gen) - except StopIteration as e: - return e.value - - try: - try: - result = f(*args, **kwargs) - except Exception as exc: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - _capture_exception(exc) - reraise(*exc_info) - - return gen.send(result) - except StopIteration as e: - return e.value - @wraps(f) - def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": + def _sentry_wrapped_create_sync(*args: "Any", **kwargs: "Any") -> "Any": integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) kwargs["integration"] = integration try: - return _execute_sync(f, *args, **kwargs) + return _sentry_patched_create_sync(f, *args, **kwargs) finally: span = sentry_sdk.get_current_span() if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR: with capture_internal_exceptions(): span.__exit__(None, None, None) - return _sentry_patched_create_sync + return _sentry_wrapped_create_sync def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None: @@ -810,41 +873,20 @@ def close(self: "Union[Stream, MessageStream]") -> None: def _wrap_message_create_async(f: "Any") -> "Any": - async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": - gen = _sentry_patched_create_common(f, *args, **kwargs) - - try: - f, args, kwargs = next(gen) - except StopIteration as e: - return await e.value - - try: - try: - result = await f(*args, **kwargs) - except Exception as exc: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - _capture_exception(exc) - reraise(*exc_info) - - return gen.send(result) - except StopIteration as e: - return e.value - @wraps(f) - async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any": + async def _sentry_wrapped_create_async(*args: "Any", **kwargs: "Any") -> "Any": integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) kwargs["integration"] = integration try: - return await _execute_async(f, *args, **kwargs) + return await _sentry_patched_create_async(f, *args, **kwargs) finally: span = sentry_sdk.get_current_span() if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR: with capture_internal_exceptions(): span.__exit__(None, None, None) - return _sentry_patched_create_async + return _sentry_wrapped_create_async def _wrap_async_close(