Skip to content

Commit 033192e

Browse files
committed
feat(appkit): agent plugin tracing
Signed-off-by: Hubert Zub <hubert.zub@databricks.com>
1 parent f3cd566 commit 033192e

13 files changed

Lines changed: 491 additions & 65 deletions

File tree

packages/appkit/src/core/appkit.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,35 @@ export class AppKit<TPlugins extends InputPluginMap> {
169169
client?: WorkspaceClient;
170170
} = {},
171171
): Promise<PluginMap<T>> {
172+
const rawPlugins = config.plugins as T;
173+
174+
// Collect plugin-contributed trace exporter headers before telemetry init
175+
const traceExporterHeaders: Record<string, string> = {
176+
...config?.telemetry?.traceExporterHeaders,
177+
};
178+
for (const entry of rawPlugins) {
179+
if (typeof entry.plugin.appendTraceHeaders === "function") {
180+
Object.assign(
181+
traceExporterHeaders,
182+
entry.plugin.appendTraceHeaders(
183+
entry.config as Parameters<
184+
typeof entry.plugin.appendTraceHeaders
185+
>[0],
186+
),
187+
);
188+
}
189+
}
190+
172191
// Initialize core services
173-
TelemetryManager.initialize(config?.telemetry);
192+
await TelemetryManager.initialize({
193+
...config?.telemetry,
194+
traceExporterHeaders:
195+
Object.keys(traceExporterHeaders).length > 0
196+
? traceExporterHeaders
197+
: undefined,
198+
});
174199
await CacheManager.getInstance(config?.cache);
175200

176-
const rawPlugins = config.plugins as T;
177-
178201
// Collect manifest resources via registry
179202
const registry = new ResourceRegistry();
180203
registry.collectResources(rawPlugins);

packages/appkit/src/plugins/agent/agent.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import type { HostedTool } from "./hosted-tools";
2424
import { isHostedTool, resolveHostedTools } from "./hosted-tools";
2525
import { createInvokeHandler } from "./invoke-handler";
2626
import manifest from "./manifest.json";
27+
import { setupExperimentTraceLocation } from "./mlflow";
2728
import { StandardAgent } from "./standard-agent";
2829
import type { AgentTool, IAgentConfig } from "./types";
30+
import { instrumentLangChain } from "./tracing";
2931

3032
const logger = createLogger("agent");
3133

@@ -41,6 +43,36 @@ export class AgentPlugin extends Plugin<IAgentConfig> {
4143

4244
static manifest = manifest as PluginManifest<"agent">;
4345

46+
/**
47+
* Called by `_createApp` before TelemetryManager initializes.
48+
*
49+
* Resolves the MLflow experiment ID and UC table name from config / env.
50+
* When `ucTableName` is not explicitly set, derives it deterministically
51+
* from catalog + schema so the exporter header is available immediately.
52+
* The actual UC location provisioning happens later in `setup()`.
53+
*
54+
* Returns OTLP headers only when an experiment ID is available.
55+
*/
56+
static appendTraceHeaders(config?: IAgentConfig): Record<string, string> {
57+
if (config?.tracing === false) return {};
58+
59+
const tracing = typeof config?.tracing === "object" ? config.tracing : {};
60+
61+
const experimentId =
62+
tracing.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID;
63+
if (!experimentId) return {};
64+
65+
const ucTableName = tracing.ucTableName ?? process.env.OTEL_UC_TABLE_NAME;
66+
67+
const headers: Record<string, string> = {
68+
"x-mlflow-experiment-id": experimentId,
69+
};
70+
if (ucTableName) {
71+
headers["X-Databricks-UC-Table-Name"] = ucTableName;
72+
}
73+
return headers;
74+
}
75+
4476
protected declare config: IAgentConfig;
4577

4678
private agentImpl: AgentInterface | null = null;
@@ -55,10 +87,63 @@ export class AgentPlugin extends Plugin<IAgentConfig> {
5587
/** Mutable list of all tools (config + added). Only used when building from config. */
5688
private toolsList: AgentTool[] = [];
5789

90+
/**
91+
* Tracing is active when `tracing !== false` AND an experiment ID is
92+
* available (from config or env).
93+
*/
94+
private isTracingActive(): boolean {
95+
if (this.config.tracing === false) {
96+
return false;
97+
}
98+
const tracing =
99+
typeof this.config.tracing === "object" ? this.config.tracing : {};
100+
101+
return Boolean(tracing.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID);
102+
}
103+
104+
/**
105+
* Resolve tracing config for the location-setup API.
106+
* Catalog and schema are derived from OTEL_UC_TABLE_NAME (catalog.schema.table).
107+
*/
108+
private getTracingConfig() {
109+
const tracing =
110+
typeof this.config.tracing === "object" ? this.config.tracing : {};
111+
const ucTableName = tracing.ucTableName ?? process.env.OTEL_UC_TABLE_NAME;
112+
const parts = ucTableName?.split(".") ?? [];
113+
return {
114+
experimentId:
115+
tracing.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID ?? "",
116+
ucCatalog: parts.length >= 3 ? parts[0] : undefined,
117+
ucSchema: parts.length >= 3 ? parts[1] : undefined,
118+
warehouseId: tracing.warehouseId,
119+
};
120+
}
121+
58122
async setup() {
59123
this.systemPrompt = this.config.systemPrompt ?? DEFAULT_SYSTEM_PROMPT;
60124

125+
let tracingActive = this.isTracingActive();
126+
127+
// Ensure the UC trace location exists; skip instrumentation if it fails
128+
if (tracingActive) {
129+
const tracingConfig = this.getTracingConfig();
130+
const location = await setupExperimentTraceLocation(tracingConfig).catch(
131+
(err) => {
132+
logger.warn("Trace location setup failed: %O", err);
133+
return null;
134+
},
135+
);
136+
if (!location) {
137+
tracingActive = false;
138+
}
139+
}
140+
141+
// If a pre-built agent is provided, use it directly
61142
if (this.config.agentInstance) {
143+
if (tracingActive) {
144+
const cbModule = await import("@langchain/core/callbacks/manager");
145+
await instrumentLangChain(cbModule);
146+
}
62147
this.agentImpl = this.config.agentInstance;
63148
logger.info("AgentPlugin initialized with provided agentInstance");
64149
return;
@@ -74,6 +159,14 @@ export class AgentPlugin extends Plugin<IAgentConfig> {
74159

75160
const { ChatDatabricks } = await import("@databricks/langchainjs");
76161

162+
// Instrument LangChain callbacks using the *same* @langchain/core copy
163+
// that the agent runtime (LangGraph, ChatDatabricks) will use.
164+
// Spans flow through the global tracer provider (TelemetryManager).
165+
if (tracingActive) {
166+
const cbModule = await import("@langchain/core/callbacks/manager");
167+
await instrumentLangChain(cbModule);
168+
}
169+
77170
this.model = new ChatDatabricks({
78171
model: modelName,
79172
useResponsesApi: this.config.useResponsesApi ?? false,
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* MLflow experiment trace location setup.
3+
*
4+
* Auto-provisions Unity Catalog trace storage and links an MLflow experiment
5+
* to it using the Databricks REST API. This mirrors the Python
6+
* `mlflow.set_experiment_trace_location()` behaviour.
7+
*
8+
* Env vars:
9+
* OTEL_UC_TABLE_NAME — fully-qualified UC table (catalog.schema.table);
10+
* catalog and schema are derived by splitting on dots.
11+
* MLFLOW_TRACING_SQL_WAREHOUSE_ID — warehouse used to create the storage (optional)
12+
*/
13+
14+
import { createLogger } from "../../logging/logger";
15+
16+
const logger = createLogger("agent:mlflow");
17+
18+
interface TraceLocationOptions {
19+
experimentId: string;
20+
/** Derived from OTEL_UC_TABLE_NAME by the caller (first dot-segment). */
21+
ucCatalog?: string;
22+
/** Derived from OTEL_UC_TABLE_NAME by the caller (second dot-segment). */
23+
ucSchema?: string;
24+
warehouseId?: string;
25+
}
26+
27+
const MLFLOW_TRACE_LOCATION_TABLE_NAME = "mlflow_experiment_trace_otel_spans";
28+
29+
/**
30+
* Link an MLflow experiment to an existing UC trace location.
31+
* Returns the fully-qualified table name on success, `null` otherwise.
32+
*/
33+
async function linkExperimentToLocation(
34+
client: { apiClient: { request: (opts: unknown) => Promise<unknown> } },
35+
experimentId: string,
36+
catalogName: string,
37+
schemaName: string,
38+
): Promise<string | null> {
39+
const tableName = `${catalogName}.${schemaName}.${MLFLOW_TRACE_LOCATION_TABLE_NAME}`;
40+
41+
try {
42+
await client.apiClient.request({
43+
path: `/api/4.0/mlflow/traces/${experimentId}/link-location`,
44+
method: "POST",
45+
headers: new Headers({ "Content-Type": "application/json" }),
46+
payload: {
47+
experiment_id: experimentId,
48+
uc_schema: {
49+
catalog_name: catalogName,
50+
schema_name: schemaName,
51+
},
52+
},
53+
raw: false,
54+
});
55+
56+
logger.info("Experiment linked to UC trace location: %s", tableName);
57+
return tableName;
58+
} catch (error: unknown) {
59+
const code =
60+
(error as { error_code?: string }).error_code ??
61+
(error as Error).name ??
62+
"UNKNOWN";
63+
logger.warn(
64+
"Could not link experiment %s to %s (%s)",
65+
experimentId,
66+
tableName,
67+
code,
68+
);
69+
return null;
70+
}
71+
}
72+
73+
/**
74+
* Provision a UC trace storage location and link the experiment to it.
75+
*
76+
* If `warehouseId` is not provided, the function attempts to link directly
77+
* (works when the UC table already exists).
78+
*
79+
* Returns the fully-qualified UC table name on success, `null` otherwise.
80+
*/
81+
export async function setupExperimentTraceLocation(
82+
opts: TraceLocationOptions,
83+
): Promise<string | null> {
84+
const catalogName = opts.ucCatalog;
85+
const schemaName = opts.ucSchema;
86+
87+
if (!catalogName || !schemaName) {
88+
logger.debug(
89+
"Skipping trace location setup — catalog/schema not available (set OTEL_UC_TABLE_NAME as catalog.schema.table)",
90+
);
91+
return null;
92+
}
93+
94+
const warehouseId =
95+
opts.warehouseId ?? process.env.MLFLOW_TRACING_SQL_WAREHOUSE_ID;
96+
97+
let client: {
98+
apiClient: { request: (opts: unknown) => Promise<unknown> };
99+
config: { ensureResolved: () => Promise<void> };
100+
};
101+
try {
102+
const { WorkspaceClient } = await import("@databricks/sdk-experimental");
103+
client = new WorkspaceClient({}) as typeof client;
104+
await client.config.ensureResolved();
105+
} catch (err) {
106+
logger.warn(
107+
"Cannot set up trace location — Databricks auth unavailable: %O",
108+
err,
109+
);
110+
return null;
111+
}
112+
113+
if (!warehouseId) {
114+
const result = await linkExperimentToLocation(
115+
client,
116+
opts.experimentId,
117+
catalogName,
118+
schemaName,
119+
);
120+
if (!result) {
121+
logger.warn(
122+
"Trace destination does not exist and cannot be created — " +
123+
"set MLFLOW_TRACING_SQL_WAREHOUSE_ID to auto-create it",
124+
);
125+
}
126+
return result;
127+
}
128+
129+
try {
130+
logger.debug(
131+
"Creating UC trace location: %s.%s (warehouse=%s)",
132+
catalogName,
133+
schemaName,
134+
warehouseId,
135+
);
136+
137+
await client.apiClient.request({
138+
path: "/api/4.0/mlflow/traces/location",
139+
method: "POST",
140+
headers: new Headers({ "Content-Type": "application/json" }),
141+
payload: {
142+
uc_schema: {
143+
catalog_name: catalogName,
144+
schema_name: schemaName,
145+
},
146+
sql_warehouse_id: warehouseId,
147+
},
148+
raw: false,
149+
});
150+
151+
return linkExperimentToLocation(
152+
client,
153+
opts.experimentId,
154+
catalogName,
155+
schemaName,
156+
);
157+
} catch (error: unknown) {
158+
// 409 = location already exists — just link
159+
if (
160+
error instanceof Error &&
161+
(error.message?.includes("409") ||
162+
error.message?.includes("ALREADY_EXISTS"))
163+
) {
164+
return linkExperimentToLocation(
165+
client,
166+
opts.experimentId,
167+
catalogName,
168+
schemaName,
169+
);
170+
}
171+
logger.warn("Failed to create UC trace location: %O", error);
172+
return null;
173+
}
174+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* LangChain tracing integration for the agent plugin.
3+
*
4+
* Instruments LangChain callbacks via @arizeai/openinference-instrumentation-langchain
5+
* so that agent spans are emitted through the global tracer provider
6+
* (set up by AppKit's TelemetryManager).
7+
*
8+
* MLflow-specific headers (experiment ID, UC table name) are added
9+
* to the trace exporter by TelemetryManager — see buildTraceExporterHeaders().
10+
*/
11+
12+
import { createLogger } from "../../logging/logger";
13+
14+
const logger = createLogger("agent:tracing");
15+
16+
/**
17+
* Instrument LangChain callbacks via the Arize/OpenInference library.
18+
*
19+
* The instrumentation creates spans using the **global** tracer provider
20+
* (registered by TelemetryManager's NodeSDK). No separate provider is needed.
21+
*
22+
* IMPORTANT: `callbackManagerModule` must be the module object that the
23+
* agent runtime uses (imported from agent.ts context), NOT a separate
24+
* import inside this file. pnpm strict isolation can resolve different
25+
* physical copies of @langchain/core for different packages in the
26+
* dependency tree, and patching the wrong copy has no effect.
27+
*/
28+
export async function instrumentLangChain(
29+
callbackManagerModule?: typeof import("@langchain/core/callbacks/manager"),
30+
): Promise<void> {
31+
try {
32+
const { LangChainInstrumentation } = await import(
33+
"@arizeai/openinference-instrumentation-langchain"
34+
);
35+
36+
const cbModule =
37+
callbackManagerModule ??
38+
(await import("@langchain/core/callbacks/manager"));
39+
40+
const inst = new LangChainInstrumentation();
41+
inst.manuallyInstrument(cbModule);
42+
43+
logger.debug("LangChain callbacks instrumented (global tracer provider)");
44+
} catch (err) {
45+
logger.error("Failed to instrument LangChain callbacks: %O", err);
46+
}
47+
}

0 commit comments

Comments
 (0)