diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index d19d9bbdd5..acd05805e5 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -23,6 +23,13 @@ from sentry_sdk.tracing_utils import _get_value, set_span_errored from sentry_sdk.utils import capture_internal_exceptions, logger +CURRENT_LANGCHAIN_AGENT_NAME = contextvars.ContextVar("CURRENT_LANGCHAIN_AGENT_NAME", default=None) + + +def _get_current_langchain_agent_name() -> "Optional[str]": + return CURRENT_LANGCHAIN_AGENT_NAME.get(None) + + if TYPE_CHECKING: from typing import ( Any, @@ -154,43 +161,6 @@ def _transform_langchain_message_content(content: "Any") -> "Any": # Contextvar to track agent names in a stack for re-entrant agent support -_agent_stack: "contextvars.ContextVar[Optional[List[Optional[str]]]]" = ( - contextvars.ContextVar("langchain_agent_stack", default=None) -) - - -def _push_agent(agent_name: "Optional[str]") -> None: - """Push an agent name onto the stack.""" - stack = _agent_stack.get() - if stack is None: - stack = [] - else: - # Copy the list to maintain contextvar isolation across async contexts - stack = stack.copy() - stack.append(agent_name) - _agent_stack.set(stack) - - -def _pop_agent() -> "Optional[str]": - """Pop an agent name from the stack and return it.""" - stack = _agent_stack.get() - if stack: - # Copy the list to maintain contextvar isolation across async contexts - stack = stack.copy() - agent_name = stack.pop() - _agent_stack.set(stack) - return agent_name - return None - - -def _get_current_agent() -> "Optional[str]": - """Get the current agent name (top of stack) without removing it.""" - stack = _agent_stack.get() - if stack: - return stack[-1] - return None - - def _get_system_instructions(messages: "List[List[BaseMessage]]") -> "List[str]": system_instructions = [] @@ -327,6 +297,11 @@ def _create_span( watched_span = WatchedSpan(sentry_sdk.start_span(**kwargs)) watched_span.span.__enter__() + + agent_name = _get_current_langchain_agent_name() + if agent_name: + watched_span.span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) + self.span_map[run_id] = watched_span self.gc_span_map() return watched_span @@ -455,10 +430,6 @@ def on_chat_model_start( elif "openai" in ai_type: span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai") - agent_name = _get_current_agent() - if agent_name: - span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) - for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: set_data_normalized(span, attribute, all_params[key], unpack=False) @@ -655,10 +626,6 @@ def on_tool_start( if tool_description is not None: span.set_data(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) - agent_name = _get_current_agent() - if agent_name: - span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) - if should_send_default_pii() and self.include_prompts: set_data_normalized( span, @@ -978,57 +945,60 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": return f(self, *args, **kwargs) agent_name, tools = _get_request_data(self, args, kwargs) + token = CURRENT_LANGCHAIN_AGENT_NAME.set(agent_name) start_span_function = get_start_span_function() - with start_span_function( - op=OP.GEN_AI_INVOKE_AGENT, - name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent", - origin=LangchainIntegration.origin, - ) as span: - _push_agent(agent_name) - try: - if agent_name: - span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) - - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) - - _set_tools_on_span(span, tools) - - # Run the agent - result = f(self, *args, **kwargs) - - input = result.get("input") - if ( - input is not None - and should_send_default_pii() - and integration.include_prompts - ): - normalized_messages = normalize_message_roles([input]) - scope = sentry_sdk.get_current_scope() - messages_data = truncate_and_annotate_messages( - normalized_messages, span, scope - ) - if messages_data is not None: - set_data_normalized( - span, - SPANDATA.GEN_AI_REQUEST_MESSAGES, - messages_data, - unpack=False, + try: + with start_span_function( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent", + origin=LangchainIntegration.origin, + ) as span: + try: + if agent_name: + span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) + + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) + + _set_tools_on_span(span, tools) + + # Run the agent + result = f(self, *args, **kwargs) + + input = result.get("input") + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + normalized_messages = normalize_message_roles([input]) + scope = sentry_sdk.get_current_scope() + messages_data = truncate_and_annotate_messages( + normalized_messages, span, scope ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - output = result.get("output") - if ( - output is not None - and should_send_default_pii() - and integration.include_prompts - ): - set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) - - return result - finally: - # Ensure agent is popped even if an exception occurs - _pop_agent() + output = result.get("output") + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) + + return result + finally: + # Ensure agent is popped even if an exception occurs + pass + finally: + CURRENT_LANGCHAIN_AGENT_NAME.reset(token) return new_invoke @@ -1041,6 +1011,7 @@ def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": return f(self, *args, **kwargs) agent_name, tools = _get_request_data(self, args, kwargs) + token = CURRENT_LANGCHAIN_AGENT_NAME.set(agent_name) start_span_function = get_start_span_function() span = start_span_function( @@ -1050,8 +1021,6 @@ def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": ) span.__enter__() - _push_agent(agent_name) - if agent_name: span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) @@ -1107,8 +1076,8 @@ def new_iterator() -> "Iterator[Any]": raise finally: # Ensure cleanup happens even if iterator is abandoned or fails - _pop_agent() span.__exit__(*exc_info) + CURRENT_LANGCHAIN_AGENT_NAME.reset(token) async def new_iterator_async() -> "AsyncIterator[Any]": exc_info: "tuple[Any, Any, Any]" = (None, None, None) @@ -1133,8 +1102,8 @@ async def new_iterator_async() -> "AsyncIterator[Any]": raise finally: # Ensure cleanup happens even if iterator is abandoned or fails - _pop_agent() span.__exit__(*exc_info) + CURRENT_LANGCHAIN_AGENT_NAME.reset(token) if str(type(result)) == "": result = new_iterator_async() diff --git a/tests/integrations/langchain/test_langchain.py b/tests/integrations/langchain/test_langchain.py index 132da0a9a0..1e73ef9634 100644 --- a/tests/integrations/langchain/test_langchain.py +++ b/tests/integrations/langchain/test_langchain.py @@ -313,6 +313,46 @@ def test_langchain_agent( ) +def test_langchain_agent_name_propagation(sentry_init, capture_events, monkeypatch): + sentry_init( + integrations=[LangchainIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are very powerful assistant, but don't know current events", + ), + ("user", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ] + ) + + llm = MockOpenAI(model_name="gpt-3.5-turbo", temperature=0, openai_api_key="badkey") + agent = create_openai_tools_agent(llm, [get_word_length], prompt) + agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) + + # Mock _get_request_data to return a test agent name + def mock_get_request_data(self, args, kwargs): + return "test_agent_name", [get_word_length] + + monkeypatch.setattr( + "sentry_sdk.integrations.langchain._get_request_data", mock_get_request_data + ) + + with start_transaction(): + list(agent_executor.stream({"input": "How many letters in the word eudca"})) + + tx = events[0] + assert tx["type"] == "transaction" + for span in tx["spans"]: + assert span["data"].get(SPANDATA.GEN_AI_AGENT_NAME) == "test_agent_name" + + def test_langchain_error(sentry_init, capture_events): sentry_init( integrations=[LangchainIntegration(include_prompts=True)],