From bd3954ea8328bba9028387674cc9bddcb75e5355 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Thu, 3 Jul 2025 17:37:13 -0700 Subject: [PATCH 01/10] fix mcp docs --- docs/v2/usage/mcp-server.mdx | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/v2/usage/mcp-server.mdx b/docs/v2/usage/mcp-server.mdx index ad599eb6d..02e659264 100644 --- a/docs/v2/usage/mcp-server.mdx +++ b/docs/v2/usage/mcp-server.mdx @@ -60,11 +60,6 @@ Authorize using an AgentOps project API key. - **Parameters**: `api_key` (string) - Your AgentOps project API key - **Usage**: The server will automatically prompt for authentication when needed -#### `get_project` -Get details about the current project. -- **Parameters**: None -- **Returns**: Project information including ID, name, and environment - #### `get_trace` Get trace information by ID. - **Parameters**: `trace_id` (string) - The trace identifier From 544dbbf583d7b8d2d3153da4fc8c707365d7e2a7 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Mon, 14 Jul 2025 11:54:45 -0700 Subject: [PATCH 02/10] semconv changes --- agentops/semconv/span_kinds.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/agentops/semconv/span_kinds.py b/agentops/semconv/span_kinds.py index 75b3c6b97..781393fad 100644 --- a/agentops/semconv/span_kinds.py +++ b/agentops/semconv/span_kinds.py @@ -13,7 +13,6 @@ class AgentOpsSpanKindValues(Enum): AGENT = "agent" TOOL = "tool" LLM = "llm" - TEAM = "team" CHAIN = "chain" TEXT = "text" GUARDRAIL = "guardrail" @@ -41,7 +40,6 @@ class SpanKind: AGENT = AgentOpsSpanKindValues.AGENT.value TOOL = AgentOpsSpanKindValues.TOOL.value LLM = AgentOpsSpanKindValues.LLM.value - TEAM = AgentOpsSpanKindValues.TEAM.value UNKNOWN = AgentOpsSpanKindValues.UNKNOWN.value CHAIN = AgentOpsSpanKindValues.CHAIN.value TEXT = AgentOpsSpanKindValues.TEXT.value From 01c9044fd0f5f64854e1ebd2911d395715a14243 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Mon, 14 Jul 2025 14:53:54 -0700 Subject: [PATCH 03/10] first pass --- .../integration/callbacks/dspy/callback.py | 451 ++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 agentops/integration/callbacks/dspy/callback.py diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py new file mode 100644 index 000000000..ba566ce94 --- /dev/null +++ b/agentops/integration/callbacks/dspy/callback.py @@ -0,0 +1,451 @@ +from typing import Any, Dict, List, Optional, Union + +from agents.agent import Agent +from opentelemetry import trace +from opentelemetry.context import attach, detach +from opentelemetry.trace import Span, SpanContext, set_span_in_context + +from agentops.helpers.serialization import safe_serialize +from agentops.logging import logger +from agentops.sdk.core import tracer +from agentops.semconv import AgentOpsSpanKindValues, SpanAttributes, CoreAttributes, agent +from agentops.integration.callbacks.langchain.utils import get_model_info + +from dspy.utils.callback import BaseCallback +import dspy + +# adding inputs as a dict, align with semconv? +# grabbing attributes so annoying + +# farm everything from dspy.module +# instance: Module + +# check dwij thing for how to test/debug the callbacks for langchain/dspy +# no kwargs except WITHIN input dict from dspy + +class DSPyCallbackHandler(BaseCallback): + """ + AgentOps callback handler for DSPy. + """ + + def __init__( + self, + api_key: Optional[str] = None, + tags: Optional[List[str]] = None, + auto_session: bool = True, + ): + self.active_spans = {} + self.api_key = api_key + self.tags = tags or [] + self.trace = None # 'trace' replaces 'session' + self.trace_token = None + self.context_tokens = {} + self.token_counts = {} + + self.active_spans.pop("test").add_event + + if auto_session: + self._initialize_agentops() + + # not entirely sure if this works + def _initialize_agentops(self): + """Initialize AgentOps""" + import agentops + + if not tracer.initialized: + init_kwargs = { + "auto_start_session": False, + "instrument_llm_calls": True, + "api_key": Optional[str] # ? fix + } + + if self.api_key: + init_kwargs["api_key"] = self.api_key + + agentops.init(**init_kwargs) + logger.debug("AgentOps initialized from DSPy callback handler") + + if not tracer.initialized: + logger.warning("AgentOps not initialized, session span will not be created") + return + + otel_tracer = tracer.get_tracer() + + span_name = f"session.{AgentOpsSpanKindValues.SESSION.value}" + + attributes = { + SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.SESSION.value, + "session.tags": self.tags, + "agentops.operation.name": "session", + "span.kind": AgentOpsSpanKindValues.SESSION.value, + } + + # Create a root session span + self.trace = otel_tracer.start_span(span_name, attributes=attributes) + + # Attach session span to current context + self.trace_token = attach(set_span_in_context(self.trace)) + + logger.debug("Created trace as root span for DSPy") + + # def utility for determining module/etc type and mapping it to agentops semconv equivalent + def _create_span( + self, + operation_name: str, + span_kind: str, + run_id: Any = None, + attributes: dict[str, Any] | None = None, + parent_run_id: str | None = None, # any to str + inputs: dict[str, Any] | None = None, + ): + if not tracer.initialized: + logger.warning("AgentOps not initialized, spans will not be created") + return trace.NonRecordingSpan(SpanContext.INVALID) # type: ignore # doesn't exist in otel?? + + otel_tracer = tracer.get_tracer() + + span_name = f"{operation_name}.{span_kind}" + + if attributes is None: + attributes = {} + + if inputs is None: + inputs = {} + + attributes = {**attributes, **inputs} # combine inputs and attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind + attributes["agentops.operation.name"] = operation_name # make a span attribute in semconv? + + if run_id is None: + run_id = id(attributes) # not sure if this applies to call_id or is the fallback? + + parent_span = None + if parent_run_id is not None and parent_run_id is self.active_spans: + # Get parent span from active spans + parent_span = self.active_spans[parent_run_id] + # Create context with parent span + parent_ctx = set_span_in_context(parent_span) + # Start span with parent context + span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes) + logger.debug(f"Start span: {span_name} with parent: {parent_run_id}") + else: + parent_ctx = set_span_in_context(self.trace) # assign span earlier, should be fine, ensure types # type: ignore + span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes) + logger.debug(f"Started span: {span_name} with session as parent") + + # Check what is available/needed and abstract out to utillities? + # Think it is callback/framework/etc specific since they surface different data + + # Store span in active_spans + self.active_spans[run_id] = span + + # Store token to detach later + token = attach(set_span_in_context(span)) + self.context_tokens[run_id] = token + + return span + + def _end_span(self, run_id: Any): + if run_id not in self.active_spans: + logger.warning(f"No span found for call {run_id}") + return + + span = self.active_spans.pop(run_id) + token = self.context_tokens.pop(run_id, None) + + if token is not None: + detach(token) + + try: + span.end() + logger.debug(f"Ended span: {span.name}") + except Exception as e: + logger.warning(f"Error ending span: {e}") + + # Clean up token counts if present + if run_id in self.token_counts: + del self.token_counts[run_id] + + # modules, adapters, evaluate, tool, lm + # what are adapters? + + def _get_span_kind(self, instance: dspy.Module) -> str: + if isinstance(instance, (dspy.ReAct, dspy.ProgramOfThought)): + return AgentOpsSpanKindValues.AGENT.value + elif isinstance(instance, ( + dspy.ChainOfThought, + dspy.MultiChainComparison, + dspy.BestOfN, + dspy.Refine + )): + return AgentOpsSpanKindValues.WORKFLOW.value + elif isinstance(instance, dspy.Predict): + return AgentOpsSpanKindValues.CHAIN.value + else: + logger.warning(f"Instance's span type not found: {instance}") + return AgentOpsSpanKindValues.UNKNOWN.value + + def _get_span_attributes(self, instance: dspy.Module) -> dict: # add self + attributes = {} + attributes = {**attributes, **instance.__dict__} # append dict, should probably delete some, organize/name + return attributes + + def on_module_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + """A handler triggered when forward() method of a module (subclass of dspy.Module) is called. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + instance: The Module instance. + inputs: The inputs to the module's forward() method. Each arguments is stored as + a key-value pair in a dictionary. + """ + + span_kind = self._get_span_kind(instance) # make it check for various types + span_attributes = self._get_span_attributes(instance) + + # unpack instance for more data + # how does mlflow deal with parent? implementaiton is brief + # deals with parent as current active span which is also how we do it here + + # if isinstance(instance, dspy.Module): + # instance.__class__.__name__ + + self._create_span( + operation_name=f"{instance.__class__.__name__}", + span_kind=span_kind, + run_id=call_id, + inputs=inputs, + attributes=span_attributes + ) + + def on_module_end( + self, + call_id: str, + outputs: Any | None, + exception: Exception | None = None, + ): + """A handler triggered after forward() method of a module (subclass of dspy.Module) is executed. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + outputs: The outputs of the module's forward() method. If the method is interrupted by + an exception, this will be None. + exception: If an exception is raised during the execution, it will be stored here. + """ + + # build in some way to add on end span, either modify the function + # OR add here, depending on whether it is applicable in other + # callback functions + self._end_span(call_id) + + def on_lm_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + """A handler triggered when __call__ method of dspy.LM instance is called. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + instance: The LM instance. + inputs: The inputs to the LM's __call__ method. Each arguments is stored as + a key-value pair in a dictionary. + """ + # use import types to exhaust all the data we can find + # take notes on it, probably gets quite messy + span_kind = self._get_span_kind(instance) + span_attributes = self._get_span_attributes(instance) + + self._create_span( + operation_name=f"{instance.__class__.__name__}", + span_kind=span_kind, + run_id=call_id, + inputs=inputs, + attributes=span_attributes + ) + + def on_lm_end( + self, + call_id: str, + outputs: Dict[str, Any] | None, + exception: Exception | None = None, + ): + """A handler triggered after __call__ method of dspy.LM instance is executed. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + outputs: The outputs of the LM's __call__ method. If the method is interrupted by + an exception, this will be None. + exception: If an exception is raised during the execution, it will be stored here. + """ + self._end_span(call_id) + + def on_adapter_format_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + """A handler triggered when format() method of an adapter (subclass of dspy.Adapter) is called. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + instance: The Adapter instance. + inputs: The inputs to the Adapter's format() method. Each arguments is stored as + a key-value pair in a dictionary. + """ + span_kind = "lm_call" + span_attributes = {"lm_instance": instance.__class__.__name__} + + self._create_span( + operation_name=f"lm_call_{instance.__class__.__name__}", + span_kind=span_kind, + run_id=call_id, + inputs=inputs, + attributes=span_attributes + ) + + def on_adapter_format_end( + self, + call_id: str, + outputs: Dict[str, Any] | None, + exception: Exception | None = None, + ): + """A handler triggered after format() method of an adapter (subclass of dspy.Adapter) is called.. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + outputs: The outputs of the Adapter's format() method. If the method is interrupted + by an exception, this will be None. + exception: If an exception is raised during the execution, it will be stored here. + """ + self._end_span(call_id) + + def on_adapter_parse_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + """A handler triggered when parse() method of an adapter (subclass of dspy.Adapter) is called. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + instance: The Adapter instance. + inputs: The inputs to the Adapter's parse() method. Each arguments is stored as + a key-value pair in a dictionary. + """ + span_kind = "adapter_format" + span_attributes = {"adapter": instance.__class__.__name__} + + self._create_span( + operation_name=f"adapter_format_{instance.__class__.__name__}", + span_kind=span_kind, + run_id=call_id, + inputs=inputs, + attributes=span_attributes + ) + + def on_adapter_parse_end( + self, + call_id: str, + outputs: Dict[str, Any] | None, + exception: Exception | None = None, + ): + """A handler triggered after parse() method of an adapter (subclass of dspy.Adapter) is called. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + outputs: The outputs of the Adapter's parse() method. If the method is interrupted + by an exception, this will be None. + exception: If an exception is raised during the execution, it will be stored here. + """ + self._end_span(call_id) + + def on_tool_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + """A handler triggered when a tool is called. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + instance: The Tool instance. + inputs: The inputs to the Tool's __call__ method. Each arguments is stored as + a key-value pair in a dictionary. + """ + span_kind = "adapter_parse" + span_attributes = {"adapter": instance.__class__.__name__} + + self._create_span( + operation_name=f"adapter_parse_{instance.__class__.__name__}", + span_kind=span_kind, + run_id=call_id, + inputs=inputs, + attributes=span_attributes + ) + + def on_tool_end( + self, + call_id: str, + outputs: Dict[str, Any] | None, + exception: Exception | None = None, + ): + """A handler triggered after a tool is executed. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + outputs: The outputs of the Tool's __call__ method. If the method is interrupted by + an exception, this will be None. + exception: If an exception is raised during the execution, it will be stored here. + """ + self._end_span(call_id) + + def on_evaluate_start( + self, + call_id: str, + instance: Any, + inputs: Dict[str, Any], + ): + """A handler triggered when evaluation is started. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + instance: The Evaluate instance. + inputs: The inputs to the Evaluate's __call__ method. Each arguments is stored as + a key-value pair in a dictionary. + """ + span_kind = "tool_call" + span_attributes = {"tool": instance.__class__.__name__} + + self._create_span( + operation_name=f"tool_call_{instance.__class__.__name__}", + span_kind=span_kind, + run_id=call_id, + inputs=inputs, + attributes=span_attributes + ) + + def on_evaluate_end( + self, + call_id: str, + outputs: Any | None, + exception: Exception | None = None, + ): + """A handler triggered after evaluation is executed. + + Args: + call_id: A unique identifier for the call. Can be used to connect start/end handlers. + outputs: The outputs of the Evaluate's __call__ method. If the method is interrupted by + an exception, this will be None. + exception: If an exception is raised during the execution, it will be stored here. + """ + self._end_span(call_id) From 914b4243d404cbdd19bc32370c59e5ae3f8f5fa1 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Thu, 17 Jul 2025 14:32:40 -0700 Subject: [PATCH 04/10] lol --- .../integration/callbacks/dspy/__init__.py | 11 ++ .../integration/callbacks/dspy/callback.py | 167 +++++++++++++----- 2 files changed, 132 insertions(+), 46 deletions(-) create mode 100644 agentops/integration/callbacks/dspy/__init__.py diff --git a/agentops/integration/callbacks/dspy/__init__.py b/agentops/integration/callbacks/dspy/__init__.py new file mode 100644 index 000000000..71736746e --- /dev/null +++ b/agentops/integration/callbacks/dspy/__init__.py @@ -0,0 +1,11 @@ +""" +DSPy integration for AgentOps. + +This module provides the AgentOps DSPy integration, including callbacks and utilities. +""" + +from agentops.integration.callbacks.dspy.callback import DSPyCallbackHandler + +__all__ = [ + "DSPyCallbackHandler", +] diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py index ba566ce94..0b67cf8b6 100644 --- a/agentops/integration/callbacks/dspy/callback.py +++ b/agentops/integration/callbacks/dspy/callback.py @@ -1,28 +1,49 @@ -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional -from agents.agent import Agent -from opentelemetry import trace from opentelemetry.context import attach, detach -from opentelemetry.trace import Span, SpanContext, set_span_in_context +from opentelemetry.trace import SpanContext, set_span_in_context +from opentelemetry.sdk.trace import Span as SDKSpan -from agentops.helpers.serialization import safe_serialize from agentops.logging import logger from agentops.sdk.core import tracer -from agentops.semconv import AgentOpsSpanKindValues, SpanAttributes, CoreAttributes, agent -from agentops.integration.callbacks.langchain.utils import get_model_info +from agentops.semconv import AgentOpsSpanKindValues, SpanAttributes from dspy.utils.callback import BaseCallback import dspy -# adding inputs as a dict, align with semconv? -# grabbing attributes so annoying - # farm everything from dspy.module # instance: Module - -# check dwij thing for how to test/debug the callbacks for langchain/dspy # no kwargs except WITHIN input dict from dspy +# TODO +# terminal logs are broken +# dspy cache turned off -> logging reasons +# any way to capture/recycle traces? +# +# logging works +# trace not ending +# logs not getting sent to the front end +# not ending the spans correctly? +# circle back to this! +# +# try/except on everything +# figure out the type that spans are HERE not generally (agentops specific?) +# +# gave up on semconv shift, fix later + +# messing around with types +otel_tracer = tracer.get_tracer() +""" +test_span: Span = otel_tracer.start_span(name="test") +test_span_a: Span = otel_tracer.start_span(name="test") +""" +test_span = otel_tracer.start_span(name="test") + +# only getting generic gen ai attributes + +# relation between otel tracer's span AND session spans? +# might just be a dict of all spans to easily access? + class DSPyCallbackHandler(BaseCallback): """ AgentOps callback handler for DSPy. @@ -34,29 +55,35 @@ def __init__( tags: Optional[List[str]] = None, auto_session: bool = True, ): - self.active_spans = {} + self.active_spans: Dict[str, SDKSpan] = {} self.api_key = api_key self.tags = tags or [] - self.trace = None # 'trace' replaces 'session' - self.trace_token = None + self.session_span = None + self.session_token = None self.context_tokens = {} self.token_counts = {} - self.active_spans.pop("test").add_event - if auto_session: self._initialize_agentops() # not entirely sure if this works def _initialize_agentops(self): """Initialize AgentOps""" + # cache hides + # disable this and test instance.cache + # or figure out how to find cache data -> attach to session + dspy.configure_cache( + enable_disk_cache=False, + enable_memory_cache=False, + ) + import agentops if not tracer.initialized: init_kwargs = { "auto_start_session": False, "instrument_llm_calls": True, - "api_key": Optional[str] # ? fix + "api_key": Optional[str], # ? fix } if self.api_key: @@ -81,13 +108,16 @@ def _initialize_agentops(self): } # Create a root session span - self.trace = otel_tracer.start_span(span_name, attributes=attributes) + self.session_span = otel_tracer.start_span(span_name, attributes=attributes) # Attach session span to current context - self.trace_token = attach(set_span_in_context(self.trace)) + self.session_token = attach(set_span_in_context(self.session_span)) logger.debug("Created trace as root span for DSPy") + def _handle_span(self): + pass + # def utility for determining module/etc type and mapping it to agentops semconv equivalent def _create_span( self, @@ -95,12 +125,12 @@ def _create_span( span_kind: str, run_id: Any = None, attributes: dict[str, Any] | None = None, - parent_run_id: str | None = None, # any to str + parent_run_id: str | None = None, # any to str inputs: dict[str, Any] | None = None, ): if not tracer.initialized: logger.warning("AgentOps not initialized, spans will not be created") - return trace.NonRecordingSpan(SpanContext.INVALID) # type: ignore # doesn't exist in otel?? + return trace.NonRecordingSpan(SpanContext.INVALID) # type: ignore # doesn't exist in otel?? otel_tracer = tracer.get_tracer() @@ -112,12 +142,12 @@ def _create_span( if inputs is None: inputs = {} - attributes = {**attributes, **inputs} # combine inputs and attributes + attributes = {**attributes, **inputs} # combine inputs and attributes attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind - attributes["agentops.operation.name"] = operation_name # make a span attribute in semconv? + attributes["agentops.operation.name"] = operation_name # make a span attribute in semconv? if run_id is None: - run_id = id(attributes) # not sure if this applies to call_id or is the fallback? + run_id = id(attributes) # not sure if this applies to call_id or is the fallback? parent_span = None if parent_run_id is not None and parent_run_id is self.active_spans: @@ -129,7 +159,9 @@ def _create_span( span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes) logger.debug(f"Start span: {span_name} with parent: {parent_run_id}") else: - parent_ctx = set_span_in_context(self.trace) # assign span earlier, should be fine, ensure types # type: ignore + parent_ctx = set_span_in_context( + self.session_span # type: ignore + ) # assign span earlier, should be fine, ensure types span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes) logger.debug(f"Started span: {span_name} with session as parent") @@ -137,7 +169,12 @@ def _create_span( # Think it is callback/framework/etc specific since they surface different data # Store span in active_spans - self.active_spans[run_id] = span + # span.get_span_context() + if isinstance(span, SDKSpan): + self.active_spans[run_id] = span + else: + pass + # handle, shouldn't be, just type check # Store token to detach later token = attach(set_span_in_context(span)) @@ -146,6 +183,12 @@ def _create_span( return span def _end_span(self, run_id: Any): + """ + End the span associated with the run_id. + + Args: + run_id: Unique identifier for the operation + """ if run_id not in self.active_spans: logger.warning(f"No span found for call {run_id}") return @@ -158,7 +201,7 @@ def _end_span(self, run_id: Any): try: span.end() - logger.debug(f"Ended span: {span.name}") + logger.debug(f"Ended span: {span.update_name('test')}") # ugh except Exception as e: logger.warning(f"Error ending span: {e}") @@ -169,25 +212,31 @@ def _end_span(self, run_id: Any): # modules, adapters, evaluate, tool, lm # what are adapters? + # maybe a better way to do this? + # includes: + # multiple inheritance in python? + # BestOfN, ChainOfThought, ChainOfThoughtWithHint + # MultiChainComparison, Predict + # ProgramOfThought, ReAct, Refine, SmartSearch + # RulesInductionProgram + # modules also include teleprompt (non lm ... what) + def _get_span_kind(self, instance: dspy.Module) -> str: if isinstance(instance, (dspy.ReAct, dspy.ProgramOfThought)): return AgentOpsSpanKindValues.AGENT.value - elif isinstance(instance, ( - dspy.ChainOfThought, - dspy.MultiChainComparison, - dspy.BestOfN, - dspy.Refine - )): + elif isinstance(instance, (dspy.ChainOfThought, dspy.MultiChainComparison, dspy.BestOfN, dspy.Refine)): return AgentOpsSpanKindValues.WORKFLOW.value elif isinstance(instance, dspy.Predict): return AgentOpsSpanKindValues.CHAIN.value + elif isinstance(instance, dspy.LM): + return AgentOpsSpanKindValues.LLM.value else: logger.warning(f"Instance's span type not found: {instance}") return AgentOpsSpanKindValues.UNKNOWN.value - def _get_span_attributes(self, instance: dspy.Module) -> dict: # add self + def _get_span_attributes(self, instance: dspy.Module) -> dict: # add self attributes = {} - attributes = {**attributes, **instance.__dict__} # append dict, should probably delete some, organize/name + attributes = {**attributes, **instance.__dict__} # append dict, should probably delete some, organize/name return attributes def on_module_start( @@ -205,22 +254,29 @@ def on_module_start( a key-value pair in a dictionary. """ - span_kind = self._get_span_kind(instance) # make it check for various types + span_kind = self._get_span_kind(instance) # make it check for various types span_attributes = self._get_span_attributes(instance) + attributes = { + instance.ca + } + # unpack instance for more data # how does mlflow deal with parent? implementaiton is brief - # deals with parent as current active span which is also how we do it here + # deals with parent as current active span which is also how we do it here # if isinstance(instance, dspy.Module): # instance.__class__.__name__ + # + # mlflow pattern for end + append data + # or just do it before ending the span self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes + attributes=span_attributes, ) def on_module_end( @@ -229,6 +285,8 @@ def on_module_end( outputs: Any | None, exception: Exception | None = None, ): + # not collecting? + # why was it collecting on the other one? """A handler triggered after forward() method of a module (subclass of dspy.Module) is executed. Args: @@ -237,10 +295,19 @@ def on_module_end( an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ + if isinstance(outputs, dspy.Predict): + outputs = outputs.dump_state() # state, no toDict()? + + if exception: + span = self.active_spans[call_id] # build in some way to add on end span, either modify the function # OR add here, depending on whether it is applicable in other # callback functions + # if exception -> return exception in the span to denote error + # if cache -> history? + # grab from cache + self._end_span(call_id) def on_lm_start( @@ -257,8 +324,16 @@ def on_lm_start( inputs: The inputs to the LM's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - # use import types to exhaust all the data we can find - # take notes on it, probably gets quite messy + # farming args!!! yay + # both generic lm and everything else + # how to organize collection + + _instance: dspy.LM = instance + _instance.cache # (cache_in_memory is deprecated) + _instance.model # might be redundant + _instance.provider # def redundant? + _instance.finetuning_model + span_kind = self._get_span_kind(instance) span_attributes = self._get_span_attributes(instance) @@ -267,7 +342,7 @@ def on_lm_start( span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes + attributes=span_attributes, ) def on_lm_end( @@ -308,7 +383,7 @@ def on_adapter_format_start( span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes + attributes=span_attributes, ) def on_adapter_format_end( @@ -349,7 +424,7 @@ def on_adapter_parse_start( span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes + attributes=span_attributes, ) def on_adapter_parse_end( @@ -390,7 +465,7 @@ def on_tool_start( span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes + attributes=span_attributes, ) def on_tool_end( @@ -431,7 +506,7 @@ def on_evaluate_start( span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes + attributes=span_attributes, ) def on_evaluate_end( From 48d8a3ca779277bf91e61cc80cd588baa44b28c2 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Fri, 18 Jul 2025 08:39:01 -0700 Subject: [PATCH 05/10] second pass --- .../integration/callbacks/dspy/callback.py | 221 ++++++++++-------- 1 file changed, 118 insertions(+), 103 deletions(-) diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py index 0b67cf8b6..b25a71401 100644 --- a/agentops/integration/callbacks/dspy/callback.py +++ b/agentops/integration/callbacks/dspy/callback.py @@ -1,5 +1,6 @@ from typing import Any, Dict, List, Optional +from opentelemetry import attributes from opentelemetry.context import attach, detach from opentelemetry.trace import SpanContext, set_span_in_context from opentelemetry.sdk.trace import Span as SDKSpan @@ -11,38 +12,14 @@ from dspy.utils.callback import BaseCallback import dspy -# farm everything from dspy.module -# instance: Module -# no kwargs except WITHIN input dict from dspy - -# TODO -# terminal logs are broken -# dspy cache turned off -> logging reasons -# any way to capture/recycle traces? -# -# logging works -# trace not ending -# logs not getting sent to the front end -# not ending the spans correctly? -# circle back to this! +# compile todo! # -# try/except on everything -# figure out the type that spans are HERE not generally (agentops specific?) -# -# gave up on semconv shift, fix later - -# messing around with types -otel_tracer = tracer.get_tracer() -""" -test_span: Span = otel_tracer.start_span(name="test") -test_span_a: Span = otel_tracer.start_span(name="test") -""" -test_span = otel_tracer.start_span(name="test") - -# only getting generic gen ai attributes - -# relation between otel tracer's span AND session spans? -# might just be a dict of all spans to easily access? +# fix terminal logs (unable to load logs) +# fix api/mcp (request failed, 404) +# semconv ... 🤢 +# try/except +# caching +# farm attributes šŸ§‘ā€šŸŒ¾ class DSPyCallbackHandler(BaseCallback): """ @@ -69,8 +46,7 @@ def __init__( # not entirely sure if this works def _initialize_agentops(self): """Initialize AgentOps""" - # cache hides - # disable this and test instance.cache + # enable cache, figure out how to denoted cached operations # or figure out how to find cache data -> attach to session dspy.configure_cache( enable_disk_cache=False, @@ -83,7 +59,7 @@ def _initialize_agentops(self): init_kwargs = { "auto_start_session": False, "instrument_llm_calls": True, - "api_key": Optional[str], # ? fix + "api_key": Optional[str] } if self.api_key: @@ -115,9 +91,6 @@ def _initialize_agentops(self): logger.debug("Created trace as root span for DSPy") - def _handle_span(self): - pass - # def utility for determining module/etc type and mapping it to agentops semconv equivalent def _create_span( self, @@ -125,31 +98,51 @@ def _create_span( span_kind: str, run_id: Any = None, attributes: dict[str, Any] | None = None, - parent_run_id: str | None = None, # any to str + parent_run_id: str | None = None, inputs: dict[str, Any] | None = None, ): + """ + Create a span for the operation. + + Args: + operation_name: Name of the operation + span_kind: Type of span + run_id: Unique identifier for the operation + attributes: Additional attributes for the span + parent_run_id: The run_id of the parent span if this is a child span + inputs: The DSPy input dictionary + + Returns: + The created span + """ if not tracer.initialized: logger.warning("AgentOps not initialized, spans will not be created") - return trace.NonRecordingSpan(SpanContext.INVALID) # type: ignore # doesn't exist in otel?? + return trace.NonRecordingSpan(SpanContext.INVALID) # type: ignore otel_tracer = tracer.get_tracer() span_name = f"{operation_name}.{span_kind}" if attributes is None: + logger.warning("no attributes") attributes = {} if inputs is None: + logger.warning("no inputs") inputs = {} - attributes = {**attributes, **inputs} # combine inputs and attributes - attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind - attributes["agentops.operation.name"] = operation_name # make a span attribute in semconv? + inputs = {f"inputs.{key}": value for key, value in inputs.items()} + + # only these explicit attributes (kind and name) are being recorded + attributes = {**attributes, **inputs} + attributes["agentops.span.kind"] = span_kind # avoid deprecated semconv + attributes["agentops.operation.name"] = operation_name + logger.warning(f"Attributes: {attributes}") if run_id is None: - run_id = id(attributes) # not sure if this applies to call_id or is the fallback? + run_id = id(attributes) - parent_span = None + # parent_span = None if parent_run_id is not None and parent_run_id is self.active_spans: # Get parent span from active spans parent_span = self.active_spans[parent_run_id] @@ -159,22 +152,18 @@ def _create_span( span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes) logger.debug(f"Start span: {span_name} with parent: {parent_run_id}") else: - parent_ctx = set_span_in_context( - self.session_span # type: ignore - ) # assign span earlier, should be fine, ensure types + if not self.session_span: + logger.warning(f"Root session span not set. Starting {span_name} as root span.") + self.session_span = otel_tracer.start_span(span_name, attributes=attributes) + parent_ctx = set_span_in_context(self.session_span) + # Start span with session as parent context span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes) logger.debug(f"Started span: {span_name} with session as parent") - # Check what is available/needed and abstract out to utillities? - # Think it is callback/framework/etc specific since they surface different data - - # Store span in active_spans - # span.get_span_context() if isinstance(span, SDKSpan): self.active_spans[run_id] = span else: - pass - # handle, shouldn't be, just type check + logger.warning(f"Span type warning: generated {type(span)}") # Store token to detach later token = attach(set_span_in_context(span)) @@ -182,20 +171,48 @@ def _create_span( return span - def _end_span(self, run_id: Any): + def _end_span( + self, + run_id: str, + outputs: Any | None, + exception: Exception | None = None, + ): """ End the span associated with the run_id. Args: run_id: Unique identifier for the operation + outputs: The DSPy output + exception: The DSPy exception """ + # deal with exceptions + # deal with outputs, span attributes <- output if run_id not in self.active_spans: logger.warning(f"No span found for call {run_id}") return - span = self.active_spans.pop(run_id) + span: SDKSpan = self.active_spans.pop(run_id) token = self.context_tokens.pop(run_id, None) + + if exception: + logger.warning(f"Exception {str(exception)}") + span.add_event( + name="exception", + attributes={ + "exception.type": exception.__class__.__name__, + "exception.message": str(exception) + }, + ) + + attributes = {} + if span.attributes: + attributes = {key: value for key, value in span.attributes.items()} + + if isinstance(outputs, Dict): + attributes = {**attributes, **outputs} + span.set_attributes(attributes) + if token is not None: detach(token) @@ -210,7 +227,6 @@ def _end_span(self, run_id: Any): del self.token_counts[run_id] # modules, adapters, evaluate, tool, lm - # what are adapters? # maybe a better way to do this? # includes: @@ -220,7 +236,6 @@ def _end_span(self, run_id: Any): # ProgramOfThought, ReAct, Refine, SmartSearch # RulesInductionProgram # modules also include teleprompt (non lm ... what) - def _get_span_kind(self, instance: dspy.Module) -> str: if isinstance(instance, (dspy.ReAct, dspy.ProgramOfThought)): return AgentOpsSpanKindValues.AGENT.value @@ -234,7 +249,9 @@ def _get_span_kind(self, instance: dspy.Module) -> str: logger.warning(f"Instance's span type not found: {instance}") return AgentOpsSpanKindValues.UNKNOWN.value - def _get_span_attributes(self, instance: dspy.Module) -> dict: # add self + # might have to make it on a per-class basis + # this sucks cuz it only unpacks lm basically + def _get_span_attributes(self, instance: Any) -> dict: attributes = {} attributes = {**attributes, **instance.__dict__} # append dict, should probably delete some, organize/name return attributes @@ -253,23 +270,14 @@ def on_module_start( inputs: The inputs to the module's forward() method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = self._get_span_kind(instance) # make it check for various types span_attributes = self._get_span_attributes(instance) - attributes = { - instance.ca - } - - # unpack instance for more data - # how does mlflow deal with parent? implementaiton is brief - # deals with parent as current active span which is also how we do it here + logger.warning(f"module start, {instance.__class__.__name__}, {instance}, {inputs}") # delete - # if isinstance(instance, dspy.Module): - # instance.__class__.__name__ - # - # mlflow pattern for end + append data - # or just do it before ending the span + # avoid recording empty "args" key + if "args" in inputs and not inputs["args"]: + inputs.pop("args") self._create_span( operation_name=f"{instance.__class__.__name__}", @@ -295,20 +303,10 @@ def on_module_end( an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - if isinstance(outputs, dspy.Predict): - outputs = outputs.dump_state() # state, no toDict()? - - if exception: - span = self.active_spans[call_id] - - # build in some way to add on end span, either modify the function - # OR add here, depending on whether it is applicable in other - # callback functions - # if exception -> return exception in the span to denote error - # if cache -> history? - # grab from cache + if isinstance(outputs, dspy.Prediction): + outputs = outputs.toDict() - self._end_span(call_id) + self._end_span(call_id, outputs, exception) def on_lm_start( self, @@ -324,19 +322,26 @@ def on_lm_start( inputs: The inputs to the LM's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - # farming args!!! yay - # both generic lm and everything else - # how to organize collection - _instance: dspy.LM = instance _instance.cache # (cache_in_memory is deprecated) _instance.model # might be redundant _instance.provider # def redundant? _instance.finetuning_model + p_instance: dspy.Predict = instance + dspy.ReAct + + # duck type to collect? + span_kind = self._get_span_kind(instance) span_attributes = self._get_span_attributes(instance) + logger.warning(f"lm start, {instance.__class__.__name__}, {instance}, {inputs}") + + attributes = { + **instance.kwargs, + } + self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, @@ -359,7 +364,8 @@ def on_lm_end( an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - self._end_span(call_id) + logger.warning("ending") + self._end_span(call_id, outputs, exception) def on_adapter_format_start( self, @@ -375,8 +381,11 @@ def on_adapter_format_start( inputs: The inputs to the Adapter's format() method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "lm_call" + span_kind = AgentOpsSpanKindValues.OPERATION.value # custom semconv? span_attributes = {"lm_instance": instance.__class__.__name__} + # semconv for parser + + logger.warning(f"adapter format start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( operation_name=f"lm_call_{instance.__class__.__name__}", @@ -400,7 +409,7 @@ def on_adapter_format_end( by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - self._end_span(call_id) + self._end_span(call_id, outputs, exception) def on_adapter_parse_start( self, @@ -416,11 +425,13 @@ def on_adapter_parse_start( inputs: The inputs to the Adapter's parse() method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "adapter_format" + span_kind = "adapter_parse" span_attributes = {"adapter": instance.__class__.__name__} + logger.warning(f"adapter parser start, {instance.__class__.__name__}, {instance}, {inputs}") + self._create_span( - operation_name=f"adapter_format_{instance.__class__.__name__}", + operation_name=f"adapter_parser_{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, @@ -441,7 +452,7 @@ def on_adapter_parse_end( by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - self._end_span(call_id) + self._end_span(call_id, outputs, exception) def on_tool_start( self, @@ -457,11 +468,13 @@ def on_tool_start( inputs: The inputs to the Tool's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "adapter_parse" - span_attributes = {"adapter": instance.__class__.__name__} + span_kind = "tool" + span_attributes = {"tool": instance.__class__.__name__} + + logger.warning(f"tool start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( - operation_name=f"adapter_parse_{instance.__class__.__name__}", + operation_name=f"tool_{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, @@ -482,7 +495,7 @@ def on_tool_end( an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - self._end_span(call_id) + self._end_span(call_id, outputs, exception) def on_evaluate_start( self, @@ -498,11 +511,13 @@ def on_evaluate_start( inputs: The inputs to the Evaluate's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "tool_call" - span_attributes = {"tool": instance.__class__.__name__} + span_kind = "evalute" + span_attributes = {"evalute": instance.__class__.__name__} + + _instance: dspy.Evaluate = instance self._create_span( - operation_name=f"tool_call_{instance.__class__.__name__}", + operation_name=f"evaluate_{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, @@ -523,4 +538,4 @@ def on_evaluate_end( an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - self._end_span(call_id) + self._end_span(call_id, outputs, exception) From 76830d9b3d7684e3770d445c1b7171a985fd684f Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Fri, 18 Jul 2025 11:25:04 -0700 Subject: [PATCH 06/10] dspy first draft --- agentops/integration/callbacks/dspy/callback.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py index b25a71401..c0c08e0b3 100644 --- a/agentops/integration/callbacks/dspy/callback.py +++ b/agentops/integration/callbacks/dspy/callback.py @@ -2,7 +2,7 @@ from opentelemetry import attributes from opentelemetry.context import attach, detach -from opentelemetry.trace import SpanContext, set_span_in_context +from opentelemetry.trace import SpanContext, NonRecordingSpan, set_span_in_context from opentelemetry.sdk.trace import Span as SDKSpan from agentops.logging import logger @@ -117,7 +117,7 @@ def _create_span( """ if not tracer.initialized: logger.warning("AgentOps not initialized, spans will not be created") - return trace.NonRecordingSpan(SpanContext.INVALID) # type: ignore + return # no valid context for non-recording span otel_tracer = tracer.get_tracer() @@ -388,7 +388,7 @@ def on_adapter_format_start( logger.warning(f"adapter format start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( - operation_name=f"lm_call_{instance.__class__.__name__}", + operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, @@ -431,7 +431,7 @@ def on_adapter_parse_start( logger.warning(f"adapter parser start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( - operation_name=f"adapter_parser_{instance.__class__.__name__}", + operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, @@ -474,7 +474,7 @@ def on_tool_start( logger.warning(f"tool start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( - operation_name=f"tool_{instance.__class__.__name__}", + operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, @@ -517,7 +517,7 @@ def on_evaluate_start( _instance: dspy.Evaluate = instance self._create_span( - operation_name=f"evaluate_{instance.__class__.__name__}", + operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, From 8ee856c103a8c3b9734acac74bf1075c7484638d Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Fri, 18 Jul 2025 12:08:30 -0700 Subject: [PATCH 07/10] dspy second draft, removed misc. --- .../integration/callbacks/dspy/callback.py | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py index c0c08e0b3..f1bd72be9 100644 --- a/agentops/integration/callbacks/dspy/callback.py +++ b/agentops/integration/callbacks/dspy/callback.py @@ -28,8 +28,9 @@ class DSPyCallbackHandler(BaseCallback): def __init__( self, - api_key: Optional[str] = None, - tags: Optional[List[str]] = None, + api_key: str | None = None, + tags: List[str] | None = None, + cache: bool = False, auto_session: bool = True, ): self.active_spans: Dict[str, SDKSpan] = {} @@ -43,23 +44,22 @@ def __init__( if auto_session: self._initialize_agentops() - # not entirely sure if this works - def _initialize_agentops(self): - """Initialize AgentOps""" - # enable cache, figure out how to denoted cached operations - # or figure out how to find cache data -> attach to session + # configure caching dspy.configure_cache( - enable_disk_cache=False, - enable_memory_cache=False, + enable_disk_cache=cache, + enable_memory_cache=cache, ) + # not entirely sure if this works + def _initialize_agentops(self): + """Initialize AgentOps""" import agentops if not tracer.initialized: init_kwargs = { "auto_start_session": False, "instrument_llm_calls": True, - "api_key": Optional[str] + "api_key": None } if self.api_key: @@ -124,26 +124,24 @@ def _create_span( span_name = f"{operation_name}.{span_kind}" if attributes is None: - logger.warning("no attributes") + logger.warning(f"No attributes recorded on span {run_id}") attributes = {} if inputs is None: - logger.warning("no inputs") + logger.warning(f"No inputs recorded on span {run_id}") inputs = {} inputs = {f"inputs.{key}": value for key, value in inputs.items()} - # only these explicit attributes (kind and name) are being recorded attributes = {**attributes, **inputs} - attributes["agentops.span.kind"] = span_kind # avoid deprecated semconv + attributes["agentops.span.kind"] = span_kind attributes["agentops.operation.name"] = operation_name - logger.warning(f"Attributes: {attributes}") if run_id is None: run_id = id(attributes) # parent_span = None - if parent_run_id is not None and parent_run_id is self.active_spans: + if parent_run_id is not None and parent_run_id in self.active_spans: # Get parent span from active spans parent_span = self.active_spans[parent_run_id] # Create context with parent span @@ -209,7 +207,9 @@ def _end_span( if span.attributes: attributes = {key: value for key, value in span.attributes.items()} - if isinstance(outputs, Dict): + if isinstance(outputs, dict): + outputs = {f"outputs.{key}": value for key, value in outputs.items()} + attributes = {**attributes, **outputs} span.set_attributes(attributes) @@ -273,7 +273,9 @@ def on_module_start( span_kind = self._get_span_kind(instance) # make it check for various types span_attributes = self._get_span_attributes(instance) - logger.warning(f"module start, {instance.__class__.__name__}, {instance}, {inputs}") # delete + #m logger.warning(f"module start, {instance.__class__.__name__}, {instance}, {inputs}") # delete + if isinstance(instance, dspy.Module): + pass # avoid recording empty "args" key if "args" in inputs and not inputs["args"]: @@ -336,7 +338,7 @@ def on_lm_start( span_kind = self._get_span_kind(instance) span_attributes = self._get_span_attributes(instance) - logger.warning(f"lm start, {instance.__class__.__name__}, {instance}, {inputs}") + #m logger.warning(f"lm start, {instance.__class__.__name__}, {instance}, {inputs}") attributes = { **instance.kwargs, @@ -364,7 +366,6 @@ def on_lm_end( an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ - logger.warning("ending") self._end_span(call_id, outputs, exception) def on_adapter_format_start( @@ -385,7 +386,7 @@ def on_adapter_format_start( span_attributes = {"lm_instance": instance.__class__.__name__} # semconv for parser - logger.warning(f"adapter format start, {instance.__class__.__name__}, {instance}, {inputs}") + #m logger.warning(f"adapter format start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( operation_name=f"{instance.__class__.__name__}", @@ -425,10 +426,10 @@ def on_adapter_parse_start( inputs: The inputs to the Adapter's parse() method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "adapter_parse" + span_kind = AgentOpsSpanKindValues.OPERATION.value span_attributes = {"adapter": instance.__class__.__name__} - logger.warning(f"adapter parser start, {instance.__class__.__name__}, {instance}, {inputs}") + #m logger.warning(f"adapter parser start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( operation_name=f"{instance.__class__.__name__}", @@ -471,7 +472,7 @@ def on_tool_start( span_kind = "tool" span_attributes = {"tool": instance.__class__.__name__} - logger.warning(f"tool start, {instance.__class__.__name__}, {instance}, {inputs}") + #m logger.warning(f"tool start, {instance.__class__.__name__}, {instance}, {inputs}") self._create_span( operation_name=f"{instance.__class__.__name__}", From 0ea4092da2d43cf8500908de7d9fa223bc4d11f0 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Fri, 18 Jul 2025 15:04:00 -0700 Subject: [PATCH 08/10] decent, minimally compliant with semconv --- .../integration/callbacks/dspy/callback.py | 145 +++++------------- 1 file changed, 41 insertions(+), 104 deletions(-) diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py index f1bd72be9..8a87a8f7d 100644 --- a/agentops/integration/callbacks/dspy/callback.py +++ b/agentops/integration/callbacks/dspy/callback.py @@ -1,8 +1,7 @@ -from typing import Any, Dict, List, Optional +from typing import Any, List -from opentelemetry import attributes from opentelemetry.context import attach, detach -from opentelemetry.trace import SpanContext, NonRecordingSpan, set_span_in_context +from opentelemetry.trace import set_span_in_context from opentelemetry.sdk.trace import Span as SDKSpan from agentops.logging import logger @@ -12,14 +11,10 @@ from dspy.utils.callback import BaseCallback import dspy -# compile todo! -# -# fix terminal logs (unable to load logs) -# fix api/mcp (request failed, 404) -# semconv ... 🤢 -# try/except -# caching -# farm attributes šŸ§‘ā€šŸŒ¾ +DSPY_INPUT = "dspy.input.{key}" +DSPY_OUTPUT = "dspy.output.{key}" +DSPY_ATTRIBUTE = "dspy.attribute.{key}" +DSPY_EVALUATE = "evaluate" class DSPyCallbackHandler(BaseCallback): """ @@ -30,10 +25,10 @@ def __init__( self, api_key: str | None = None, tags: List[str] | None = None, - cache: bool = False, + cache: bool = True, auto_session: bool = True, ): - self.active_spans: Dict[str, SDKSpan] = {} + self.active_spans: dict[str, SDKSpan] = {} self.api_key = api_key self.tags = tags or [] self.session_span = None @@ -44,7 +39,7 @@ def __init__( if auto_session: self._initialize_agentops() - # configure caching + # Configure caching dspy.configure_cache( enable_disk_cache=cache, enable_memory_cache=cache, @@ -91,7 +86,6 @@ def _initialize_agentops(self): logger.debug("Created trace as root span for DSPy") - # def utility for determining module/etc type and mapping it to agentops semconv equivalent def _create_span( self, operation_name: str, @@ -117,7 +111,7 @@ def _create_span( """ if not tracer.initialized: logger.warning("AgentOps not initialized, spans will not be created") - return # no valid context for non-recording span + return # No valid context for non-recording span otel_tracer = tracer.get_tracer() @@ -131,10 +125,10 @@ def _create_span( logger.warning(f"No inputs recorded on span {run_id}") inputs = {} - inputs = {f"inputs.{key}": value for key, value in inputs.items()} + inputs = {DSPY_INPUT.format(key=key): value for key, value in inputs.items()} attributes = {**attributes, **inputs} - attributes["agentops.span.kind"] = span_kind + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind attributes["agentops.operation.name"] = operation_name if run_id is None: @@ -183,8 +177,6 @@ def _end_span( outputs: The DSPy output exception: The DSPy exception """ - # deal with exceptions - # deal with outputs, span attributes <- output if run_id not in self.active_spans: logger.warning(f"No span found for call {run_id}") return @@ -204,14 +196,9 @@ def _end_span( ) attributes = {} - if span.attributes: - attributes = {key: value for key, value in span.attributes.items()} - if isinstance(outputs, dict): - outputs = {f"outputs.{key}": value for key, value in outputs.items()} - - attributes = {**attributes, **outputs} - span.set_attributes(attributes) + outputs = {DSPY_OUTPUT.format(key=key): value for key, value in outputs.items()} + span.set_attributes(outputs) if token is not None: detach(token) @@ -226,16 +213,7 @@ def _end_span( if run_id in self.token_counts: del self.token_counts[run_id] - # modules, adapters, evaluate, tool, lm - - # maybe a better way to do this? - # includes: - # multiple inheritance in python? - # BestOfN, ChainOfThought, ChainOfThoughtWithHint - # MultiChainComparison, Predict - # ProgramOfThought, ReAct, Refine, SmartSearch - # RulesInductionProgram - # modules also include teleprompt (non lm ... what) + # does this type check break on things? def _get_span_kind(self, instance: dspy.Module) -> str: if isinstance(instance, (dspy.ReAct, dspy.ProgramOfThought)): return AgentOpsSpanKindValues.AGENT.value @@ -249,18 +227,11 @@ def _get_span_kind(self, instance: dspy.Module) -> str: logger.warning(f"Instance's span type not found: {instance}") return AgentOpsSpanKindValues.UNKNOWN.value - # might have to make it on a per-class basis - # this sucks cuz it only unpacks lm basically - def _get_span_attributes(self, instance: Any) -> dict: - attributes = {} - attributes = {**attributes, **instance.__dict__} # append dict, should probably delete some, organize/name - return attributes - def on_module_start( self, call_id: str, instance: Any, - inputs: Dict[str, Any], + inputs: dict[str, Any], ): """A handler triggered when forward() method of a module (subclass of dspy.Module) is called. @@ -270,23 +241,15 @@ def on_module_start( inputs: The inputs to the module's forward() method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = self._get_span_kind(instance) # make it check for various types - span_attributes = self._get_span_attributes(instance) - - #m logger.warning(f"module start, {instance.__class__.__name__}, {instance}, {inputs}") # delete - if isinstance(instance, dspy.Module): - pass - - # avoid recording empty "args" key - if "args" in inputs and not inputs["args"]: - inputs.pop("args") + span_kind = self._get_span_kind(instance) + attributes = {"instance": instance.__dict__} self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes, + attributes=attributes, ) def on_module_end( @@ -314,7 +277,7 @@ def on_lm_start( self, call_id: str, instance: Any, - inputs: Dict[str, Any], + inputs: dict[str, Any], ): """A handler triggered when __call__ method of dspy.LM instance is called. @@ -324,38 +287,21 @@ def on_lm_start( inputs: The inputs to the LM's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - _instance: dspy.LM = instance - _instance.cache # (cache_in_memory is deprecated) - _instance.model # might be redundant - _instance.provider # def redundant? - _instance.finetuning_model - - p_instance: dspy.Predict = instance - dspy.ReAct - - # duck type to collect? - span_kind = self._get_span_kind(instance) - span_attributes = self._get_span_attributes(instance) - - #m logger.warning(f"lm start, {instance.__class__.__name__}, {instance}, {inputs}") - - attributes = { - **instance.kwargs, - } + attributes = {"instance": instance.__dict__} self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes, + attributes=attributes, ) def on_lm_end( self, call_id: str, - outputs: Dict[str, Any] | None, + outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after __call__ method of dspy.LM instance is executed. @@ -372,7 +318,7 @@ def on_adapter_format_start( self, call_id: str, instance: Any, - inputs: Dict[str, Any], + inputs: dict[str, Any], ): """A handler triggered when format() method of an adapter (subclass of dspy.Adapter) is called. @@ -382,24 +328,21 @@ def on_adapter_format_start( inputs: The inputs to the Adapter's format() method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = AgentOpsSpanKindValues.OPERATION.value # custom semconv? - span_attributes = {"lm_instance": instance.__class__.__name__} - # semconv for parser - - #m logger.warning(f"adapter format start, {instance.__class__.__name__}, {instance}, {inputs}") + span_kind = AgentOpsSpanKindValues.OPERATION.value + attributes = {"instance": instance.__dict__} self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes, + attributes=attributes, ) def on_adapter_format_end( self, call_id: str, - outputs: Dict[str, Any] | None, + outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after format() method of an adapter (subclass of dspy.Adapter) is called.. @@ -416,7 +359,7 @@ def on_adapter_parse_start( self, call_id: str, instance: Any, - inputs: Dict[str, Any], + inputs: dict[str, Any], ): """A handler triggered when parse() method of an adapter (subclass of dspy.Adapter) is called. @@ -427,22 +370,20 @@ def on_adapter_parse_start( a key-value pair in a dictionary. """ span_kind = AgentOpsSpanKindValues.OPERATION.value - span_attributes = {"adapter": instance.__class__.__name__} - - #m logger.warning(f"adapter parser start, {instance.__class__.__name__}, {instance}, {inputs}") + attributes = {"instance": instance.__dict__} self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes, + attributes=attributes, ) def on_adapter_parse_end( self, call_id: str, - outputs: Dict[str, Any] | None, + outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after parse() method of an adapter (subclass of dspy.Adapter) is called. @@ -459,7 +400,7 @@ def on_tool_start( self, call_id: str, instance: Any, - inputs: Dict[str, Any], + inputs: dict[str, Any], ): """A handler triggered when a tool is called. @@ -469,23 +410,21 @@ def on_tool_start( inputs: The inputs to the Tool's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "tool" - span_attributes = {"tool": instance.__class__.__name__} - - #m logger.warning(f"tool start, {instance.__class__.__name__}, {instance}, {inputs}") + span_kind = AgentOpsSpanKindValues.TOOL.value + attributes = {"instance": instance.__dict__} self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes, + attributes=attributes, ) def on_tool_end( self, call_id: str, - outputs: Dict[str, Any] | None, + outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after a tool is executed. @@ -502,7 +441,7 @@ def on_evaluate_start( self, call_id: str, instance: Any, - inputs: Dict[str, Any], + inputs: dict[str, Any], ): """A handler triggered when evaluation is started. @@ -512,17 +451,15 @@ def on_evaluate_start( inputs: The inputs to the Evaluate's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ - span_kind = "evalute" - span_attributes = {"evalute": instance.__class__.__name__} - - _instance: dspy.Evaluate = instance + span_kind = DSPY_EVALUATE + attributes = {"instance": instance.__dict__} self._create_span( operation_name=f"{instance.__class__.__name__}", span_kind=span_kind, run_id=call_id, inputs=inputs, - attributes=span_attributes, + attributes=attributes, ) def on_evaluate_end( From a07f2c7dccb1e579001dbc9307c3511fced24a01 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Fri, 18 Jul 2025 15:08:24 -0700 Subject: [PATCH 09/10] precommit fix --- .../integration/callbacks/dspy/callback.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/agentops/integration/callbacks/dspy/callback.py b/agentops/integration/callbacks/dspy/callback.py index 8a87a8f7d..37c728006 100644 --- a/agentops/integration/callbacks/dspy/callback.py +++ b/agentops/integration/callbacks/dspy/callback.py @@ -16,6 +16,7 @@ DSPY_ATTRIBUTE = "dspy.attribute.{key}" DSPY_EVALUATE = "evaluate" + class DSPyCallbackHandler(BaseCallback): """ AgentOps callback handler for DSPy. @@ -51,11 +52,7 @@ def _initialize_agentops(self): import agentops if not tracer.initialized: - init_kwargs = { - "auto_start_session": False, - "instrument_llm_calls": True, - "api_key": None - } + init_kwargs = {"auto_start_session": False, "instrument_llm_calls": True, "api_key": None} if self.api_key: init_kwargs["api_key"] = self.api_key @@ -111,7 +108,7 @@ def _create_span( """ if not tracer.initialized: logger.warning("AgentOps not initialized, spans will not be created") - return # No valid context for non-recording span + return # No valid context for non-recording span otel_tracer = tracer.get_tracer() @@ -184,18 +181,13 @@ def _end_span( span: SDKSpan = self.active_spans.pop(run_id) token = self.context_tokens.pop(run_id, None) - if exception: logger.warning(f"Exception {str(exception)}") span.add_event( name="exception", - attributes={ - "exception.type": exception.__class__.__name__, - "exception.message": str(exception) - }, + attributes={"exception.type": exception.__class__.__name__, "exception.message": str(exception)}, ) - attributes = {} if isinstance(outputs, dict): outputs = {DSPY_OUTPUT.format(key=key): value for key, value in outputs.items()} span.set_attributes(outputs) @@ -205,7 +197,7 @@ def _end_span( try: span.end() - logger.debug(f"Ended span: {span.update_name('test')}") # ugh + logger.debug(f"Ended span: {span.update_name('test')}") # ugh except Exception as e: logger.warning(f"Error ending span: {e}") From 90f85380e8277fedf083a0e54198f98840a5fe11 Mon Sep 17 00:00:00 2001 From: michi-okahata Date: Fri, 18 Jul 2025 15:44:35 -0700 Subject: [PATCH 10/10] dspy integration example --- .../workflows/examples-integration-test.yml | 3 ++ examples/dspy/dspy_calculator.py | 37 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 examples/dspy/dspy_calculator.py diff --git a/.github/workflows/examples-integration-test.yml b/.github/workflows/examples-integration-test.yml index ccddbd194..6239e1444 100644 --- a/.github/workflows/examples-integration-test.yml +++ b/.github/workflows/examples-integration-test.yml @@ -107,8 +107,11 @@ jobs: - { path: 'examples/openai_agents/agents_tools.py', name: 'OpenAI Agents Tools' } - { path: 'examples/openai_agents/customer_service_agent.py', name: 'OpenAI Agents Customer Service' } + # DSPy examples + - { path: 'examples/dspy/dspy_calculator.py', name: 'DSPy ReAct Agent' } # Add more examples as needed + steps: - uses: actions/checkout@v4 diff --git a/examples/dspy/dspy_calculator.py b/examples/dspy/dspy_calculator.py new file mode 100644 index 000000000..3b7cb4fd4 --- /dev/null +++ b/examples/dspy/dspy_calculator.py @@ -0,0 +1,37 @@ +import os +import time + +import dspy +from dspy import Tool +import agentops +from agentops.integration.callbacks.dspy import DSPyCallbackHandler + +from dotenv import load_dotenv + +load_dotenv() + +handler = DSPyCallbackHandler(api_key=os.getenv("AGENTOPS_API_KEY", ""), cache=False) +os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "") + +lm = dspy.LM("openai/gpt-4o-mini", temperature=0.5) +dspy.configure(lm=lm, callbacks=[handler]) + +def multiplier(*, a: int, b: int) -> int: + return a * b + +multiplier = Tool(multiplier) + +agent = dspy.ProgramOfThought("question -> answer: int") +response = agent(question="What is twenty five times twenty five?", tools=[multiplier]) + +print(response) +print("Now let's verify that our LLM calls were tracked properly...") + +try: + agentops.validate_trace_spans(trace_context=None) + print("\nāœ… Success! All LLM spans were properly recorded in AgentOps.") +except ImportError: + print("\nāŒ Error: agentops library not installed. Please install it to validate spans.") +except agentops.ValidationError as e: + print(f"\nāŒ Error validating spans: {e}") + raise