Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/sdk/server-ai/src/ldai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Edge, JudgeConfiguration, LDAIAgent, LDAIAgentConfig, LDAIAgentDefaults,
LDMessage, ModelConfig, ProviderConfig)
from ldai.providers.types import EvalScore, JudgeResponse
from ldai.tracker import AIGraphTracker

__all__ = [
'LDAIClient',
Expand All @@ -21,6 +22,7 @@
'AIAgentConfigRequest',
'AIAgents',
'AIAgentGraphConfig',
'AIGraphTracker',
'Edge',
'AICompletionConfig',
'AICompletionConfigDefault',
Expand Down
12 changes: 11 additions & 1 deletion packages/sdk/server-ai/src/ldai/agent_graph/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Graph implementation for managing AI agent graphs."""

from dataclasses import dataclass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import here, not sure why it wasn't picked up in previous lint

from typing import Any, Callable, Dict, List, Optional, Set

from ldclient import Context

from ldai.models import AIAgentConfig, AIAgentGraphConfig, Edge
from ldai.tracker import AIGraphTracker

DEFAULT_FALSE = AIAgentConfig(key="", enabled=False)

Expand Down Expand Up @@ -54,11 +54,21 @@ def __init__(
nodes: Dict[str, AgentGraphNode],
context: Context,
enabled: bool,
tracker: Optional[AIGraphTracker] = None,
):
self._agent_graph = agent_graph
self._context = context
self._nodes = nodes
self.enabled = enabled
self._tracker = tracker

def get_tracker(self) -> Optional[AIGraphTracker]:
"""
Get the graph tracker for this graph definition.

:return: The AIGraphTracker instance, or None if not available.
"""
return self._tracker

def is_enabled(self) -> bool:
"""Check if the graph is enabled."""
Expand Down
19 changes: 18 additions & 1 deletion packages/sdk/server-ai/src/ldai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
JudgeConfiguration, LDMessage, ModelConfig,
ProviderConfig)
from ldai.providers.ai_provider_factory import AIProviderFactory
from ldai.tracker import LDAIConfigTracker
from ldai.tracker import AIGraphTracker, LDAIConfigTracker


class LDAIClient:
Expand Down Expand Up @@ -435,6 +435,19 @@ def agent_graph(
"""
variation = self._client.variation(key, context, {})

# Extract variation metadata for tracker
variation_key = variation.get("_ldMeta", {}).get("variationKey", "")
version = int(variation.get("_ldMeta", {}).get("version", 1))

# Create graph tracker
tracker = AIGraphTracker(
self._client,
variation_key,
key,
version,
context,
)

if not variation.get("root"):
log.debug(f"Agent graph {key} is disabled, no root config key found")
return AgentGraphDefinition(
Expand All @@ -447,6 +460,7 @@ def agent_graph(
nodes={},
context=context,
enabled=False,
tracker=tracker,
)

edge_keys = list[str](variation.get("edges", {}).keys())
Expand Down Expand Up @@ -474,6 +488,7 @@ def agent_graph(
nodes={},
context=context,
enabled=False,
tracker=tracker,
)

try:
Expand Down Expand Up @@ -504,6 +519,7 @@ def agent_graph(
nodes={},
context=context,
enabled=False,
tracker=tracker,
)

nodes = AgentGraphDefinition.build_nodes(
Expand All @@ -516,6 +532,7 @@ def agent_graph(
nodes=nodes,
context=context,
enabled=agent_graph_config.enabled,
tracker=tracker,
)

def agents(
Expand Down
244 changes: 243 additions & 1 deletion packages/sdk/server-ai/src/ldai/tracker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from ldclient import Context, LDClient

Expand Down Expand Up @@ -407,3 +407,245 @@ def _openai_to_token_usage(data: dict) -> TokenUsage:
input=data.get("prompt_tokens", 0),
output=data.get("completion_tokens", 0),
)


class AIGraphTracker:
"""
Tracks graph-level, node-level, and edge-level metrics for AI agent graph operations.
"""

def __init__(
self,
ld_client: LDClient,
variation_key: str,
graph_key: str,
version: int,
context: Context,
):
"""
Initialize an AI Graph tracker.

:param ld_client: LaunchDarkly client instance.
:param variation_key: Variation key for tracking.
:param graph_key: Graph configuration key for tracking.
:param version: Version of the variation.
:param context: Context for evaluation.
"""
self._ld_client = ld_client
self._variation_key = variation_key
self._graph_key = graph_key
self._version = version
self._context = context

def __get_track_data(self):
"""
Get tracking data for events.

:return: Dictionary containing variation, graph key, and version.
"""
track_data = {
"variationKey": self._variation_key,
"graphKey": self._graph_key,
"version": self._version,
}
# Note: aiSdkName and aiSdkVersion are optional and not included for now
return track_data

def track_invocation_success(self) -> None:
"""
Track a successful graph invocation.
"""
self._ld_client.track(
"$ld:ai:graph:invocation_success",
self._context,
self.__get_track_data(),
1,
)

def track_invocation_failure(self) -> None:
"""
Track an unsuccessful graph invocation.
"""
self._ld_client.track(
"$ld:ai:graph:invocation_failure",
self._context,
self.__get_track_data(),
1,
)

def track_latency(self, duration: int) -> None:
"""
Track the total latency of graph execution.

:param duration: Duration in milliseconds.
"""
self._ld_client.track(
"$ld:ai:graph:latency",
self._context,
self.__get_track_data(),
duration,
)

def track_total_tokens(self, tokens: TokenUsage) -> None:
"""
Track aggregated token usage across the entire graph invocation.

:param tokens: Token usage data.
"""
self._ld_client.track(
"$ld:ai:graph:total_tokens",
self._context,
self.__get_track_data(),
tokens.total,
)

def track_path(self, path: List[str]) -> None:
"""
Track the execution path through the graph.

:param path: An array of configuration keys representing the sequence of nodes executed during graph traversal.
"""
track_data = {**self.__get_track_data(), "path": path}
self._ld_client.track(
"$ld:ai:graph:path",
self._context,
track_data,
1,
)

def track_judge_response(self, response: Any) -> None:
"""
Track judge responses for the final graph output.

:param response: JudgeResponse object containing evals and success status.
"""
from ldai.providers.types import EvalScore, JudgeResponse

if isinstance(response, JudgeResponse):
if response.evals:
track_data = self.__get_track_data()
if response.judge_config_key:
track_data = {**track_data, "judgeConfigKey": response.judge_config_key}

for metric_key, eval_score in response.evals.items():
if isinstance(eval_score, EvalScore):
self._ld_client.track(
metric_key,
self._context,
track_data,
eval_score.score,
)

def track_node_invocation(self, config_key: str) -> None:
"""
Track when a node is invoked during graph execution.

:param config_key: The configuration key of the node being invoked.
"""
track_data = {**self.__get_track_data(), "configKey": config_key}
self._ld_client.track(
"$ld:ai:graph:node_invocation",
self._context,
track_data,
1,
)

def track_tool_call(self, config_key: str, tool_key: str) -> None:
"""
Track tool calls made by nodes during graph execution.

:param config_key: The configuration key of the node making the tool call.
:param tool_key: The key of the tool being called.
"""
track_data = {
**self.__get_track_data(),
"configKey": config_key,
"toolKey": tool_key,
}
self._ld_client.track(
"$ld:ai:graph:tool_call",
self._context,
track_data,
1,
)

def track_node_judge_response(self, config_key: str, response: Any) -> None:
"""
Track judge responses for a specific node.

:param config_key: The configuration key of the node being evaluated.
:param response: JudgeResponse object containing evals and success status.
"""
from ldai.providers.types import EvalScore, JudgeResponse

if isinstance(response, JudgeResponse):
if response.evals:
track_data = {**self.__get_track_data(), "configKey": config_key}
if response.judge_config_key:
track_data = {**track_data, "judgeConfigKey": response.judge_config_key}

for metric_key, eval_score in response.evals.items():
if isinstance(eval_score, EvalScore):
self._ld_client.track(
metric_key,
self._context,
track_data,
eval_score.score,
)

def track_redirect(self, source_key: str, redirected_target: str) -> None:
"""
Track when a node redirects to a different target than originally specified.

:param source_key: The configuration key of the source node.
:param redirected_target: The configuration key of the target node that was redirected to.
"""
track_data = {
**self.__get_track_data(),
"sourceKey": source_key,
"redirectedTarget": redirected_target,
}
self._ld_client.track(
"$ld:ai:graph:redirect",
self._context,
track_data,
1,
)

def track_handoff_success(self, source_key: str, target_key: str) -> None:
"""
Track successful handoffs between nodes.

:param source_key: The configuration key of the source node.
:param target_key: The configuration key of the target node.
"""
track_data = {
**self.__get_track_data(),
"sourceKey": source_key,
"targetKey": target_key,
}
self._ld_client.track(
"$ld:ai:graph:handoff_success",
self._context,
track_data,
1,
)

def track_handoff_failure(self, source_key: str, target_key: str) -> None:
"""
Track failed handoffs between nodes.

:param source_key: The configuration key of the source node.
:param target_key: The configuration key of the target node.
"""
track_data = {
**self.__get_track_data(),
"sourceKey": source_key,
"targetKey": target_key,
}
self._ld_client.track(
"$ld:ai:graph:handoff_failure",
self._context,
track_data,
1,
)