Skip to content

Commit fc7bc3c

Browse files
committed
feat(appkit): agent plugin tracing
Signed-off-by: Hubert Zub <hubert.zub@databricks.com>
1 parent fd1bffc commit fc7bc3c

File tree

13 files changed

+490
-65
lines changed

13 files changed

+490
-65
lines changed

packages/appkit/src/core/appkit.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,35 @@ export class AppKit<TPlugins extends InputPluginMap> {
169169
client?: WorkspaceClient;
170170
} = {},
171171
): Promise<PluginMap<T>> {
172+
const rawPlugins = config.plugins as T;
173+
174+
// Collect plugin-contributed trace exporter headers before telemetry init
175+
const traceExporterHeaders: Record<string, string> = {
176+
...config?.telemetry?.traceExporterHeaders,
177+
};
178+
for (const entry of rawPlugins) {
179+
if (typeof entry.plugin.appendTraceHeaders === "function") {
180+
Object.assign(
181+
traceExporterHeaders,
182+
entry.plugin.appendTraceHeaders(
183+
entry.config as Parameters<
184+
typeof entry.plugin.appendTraceHeaders
185+
>[0],
186+
),
187+
);
188+
}
189+
}
190+
172191
// Initialize core services
173-
TelemetryManager.initialize(config?.telemetry);
192+
await TelemetryManager.initialize({
193+
...config?.telemetry,
194+
traceExporterHeaders:
195+
Object.keys(traceExporterHeaders).length > 0
196+
? traceExporterHeaders
197+
: undefined,
198+
});
174199
await CacheManager.getInstance(config?.cache);
175200

176-
const rawPlugins = config.plugins as T;
177-
178201
// Collect manifest resources via registry
179202
const registry = new ResourceRegistry();
180203
registry.collectResources(rawPlugins);

packages/appkit/src/plugins/agent/agent.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import { Plugin, toPlugin } from "../../plugin";
2020
import type { AgentInterface } from "./agent-interface";
2121
import { createInvokeHandler } from "./invoke-handler";
2222
import manifest from "./manifest.json";
23+
import { setupExperimentTraceLocation } from "./mlflow";
2324
import { StandardAgent } from "./standard-agent";
25+
import { instrumentLangChain } from "./tracing";
2426
import type { IAgentConfig } from "./types";
2527

2628
const logger = createLogger("agent");
@@ -37,6 +39,36 @@ export class AgentPlugin extends Plugin<IAgentConfig> {
3739

3840
static manifest = manifest;
3941

42+
/**
43+
* Called by `_createApp` before TelemetryManager initializes.
44+
*
45+
* Resolves the MLflow experiment ID and UC table name from config / env.
46+
* When `ucTableName` is not explicitly set, derives it deterministically
47+
* from catalog + schema so the exporter header is available immediately.
48+
* The actual UC location provisioning happens later in `setup()`.
49+
*
50+
* Returns OTLP headers only when an experiment ID is available.
51+
*/
52+
static appendTraceHeaders(config?: IAgentConfig): Record<string, string> {
53+
if (config?.tracing === false) return {};
54+
55+
const tracing = typeof config?.tracing === "object" ? config.tracing : {};
56+
57+
const experimentId =
58+
tracing.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID;
59+
if (!experimentId) return {};
60+
61+
const ucTableName = tracing.ucTableName ?? process.env.OTEL_UC_TABLE_NAME;
62+
63+
const headers: Record<string, string> = {
64+
"x-mlflow-experiment-id": experimentId,
65+
};
66+
if (ucTableName) {
67+
headers["X-Databricks-UC-Table-Name"] = ucTableName;
68+
}
69+
return headers;
70+
}
71+
4072
protected declare config: IAgentConfig;
4173

4274
private agentImpl: AgentInterface | null = null;
@@ -53,11 +85,63 @@ export class AgentPlugin extends Plugin<IAgentConfig> {
5385
/** Mutable list of MCP servers (config + added). Only used when building from config. */
5486
private mcpServersList: DatabricksMCPServer[] = [];
5587

88+
/**
89+
* Tracing is active when `tracing !== false` AND an experiment ID is
90+
* available (from config or env).
91+
*/
92+
private isTracingActive(): boolean {
93+
if (this.config.tracing === false) {
94+
return false;
95+
}
96+
const tracing =
97+
typeof this.config.tracing === "object" ? this.config.tracing : {};
98+
99+
return Boolean(tracing.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID);
100+
}
101+
102+
/**
103+
* Resolve tracing config for the location-setup API.
104+
* Catalog and schema are derived from OTEL_UC_TABLE_NAME (catalog.schema.table).
105+
*/
106+
private getTracingConfig() {
107+
const tracing =
108+
typeof this.config.tracing === "object" ? this.config.tracing : {};
109+
const ucTableName = tracing.ucTableName ?? process.env.OTEL_UC_TABLE_NAME;
110+
const parts = ucTableName?.split(".") ?? [];
111+
return {
112+
experimentId:
113+
tracing.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID ?? "",
114+
ucCatalog: parts.length >= 3 ? parts[0] : undefined,
115+
ucSchema: parts.length >= 3 ? parts[1] : undefined,
116+
warehouseId: tracing.warehouseId,
117+
};
118+
}
119+
56120
async setup() {
57121
this.systemPrompt = this.config.systemPrompt ?? DEFAULT_SYSTEM_PROMPT;
58122

123+
let tracingActive = this.isTracingActive();
124+
125+
// Ensure the UC trace location exists; skip instrumentation if it fails
126+
if (tracingActive) {
127+
const tracingConfig = this.getTracingConfig();
128+
const location = await setupExperimentTraceLocation(tracingConfig).catch(
129+
(err) => {
130+
logger.warn("Trace location setup failed: %O", err);
131+
return null;
132+
},
133+
);
134+
if (!location) {
135+
tracingActive = false;
136+
}
137+
}
138+
59139
// If a pre-built agent is provided, use it directly
60140
if (this.config.agentInstance) {
141+
if (tracingActive) {
142+
const cbModule = await import("@langchain/core/callbacks/manager");
143+
await instrumentLangChain(cbModule);
144+
}
61145
this.agentImpl = this.config.agentInstance;
62146
logger.info("AgentPlugin initialized with provided agentInstance");
63147
return;
@@ -74,6 +158,14 @@ export class AgentPlugin extends Plugin<IAgentConfig> {
74158

75159
const { ChatDatabricks } = await import("@databricks/langchainjs");
76160

161+
// Instrument LangChain callbacks using the *same* @langchain/core copy
162+
// that the agent runtime (LangGraph, ChatDatabricks) will use.
163+
// Spans flow through the global tracer provider (TelemetryManager).
164+
if (tracingActive) {
165+
const cbModule = await import("@langchain/core/callbacks/manager");
166+
await instrumentLangChain(cbModule);
167+
}
168+
77169
this.model = new ChatDatabricks({
78170
model: modelName,
79171
useResponsesApi: this.config.useResponsesApi ?? false,
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* MLflow experiment trace location setup.
3+
*
4+
* Auto-provisions Unity Catalog trace storage and links an MLflow experiment
5+
* to it using the Databricks REST API. This mirrors the Python
6+
* `mlflow.set_experiment_trace_location()` behaviour.
7+
*
8+
* Env vars:
9+
* OTEL_UC_TABLE_NAME — fully-qualified UC table (catalog.schema.table);
10+
* catalog and schema are derived by splitting on dots.
11+
* MLFLOW_TRACING_SQL_WAREHOUSE_ID — warehouse used to create the storage (optional)
12+
*/
13+
14+
import { createLogger } from "../../logging/logger";
15+
16+
const logger = createLogger("agent:mlflow");
17+
18+
interface TraceLocationOptions {
19+
experimentId: string;
20+
/** Derived from OTEL_UC_TABLE_NAME by the caller (first dot-segment). */
21+
ucCatalog?: string;
22+
/** Derived from OTEL_UC_TABLE_NAME by the caller (second dot-segment). */
23+
ucSchema?: string;
24+
warehouseId?: string;
25+
}
26+
27+
const MLFLOW_TRACE_LOCATION_TABLE_NAME = "mlflow_experiment_trace_otel_spans";
28+
29+
/**
30+
* Link an MLflow experiment to an existing UC trace location.
31+
* Returns the fully-qualified table name on success, `null` otherwise.
32+
*/
33+
async function linkExperimentToLocation(
34+
client: { apiClient: { request: (opts: unknown) => Promise<unknown> } },
35+
experimentId: string,
36+
catalogName: string,
37+
schemaName: string,
38+
): Promise<string | null> {
39+
const tableName = `${catalogName}.${schemaName}.${MLFLOW_TRACE_LOCATION_TABLE_NAME}`;
40+
41+
try {
42+
await client.apiClient.request({
43+
path: `/api/4.0/mlflow/traces/${experimentId}/link-location`,
44+
method: "POST",
45+
headers: new Headers({ "Content-Type": "application/json" }),
46+
payload: {
47+
experiment_id: experimentId,
48+
uc_schema: {
49+
catalog_name: catalogName,
50+
schema_name: schemaName,
51+
},
52+
},
53+
raw: false,
54+
});
55+
56+
logger.info("Experiment linked to UC trace location: %s", tableName);
57+
return tableName;
58+
} catch (error: unknown) {
59+
const code =
60+
(error as { error_code?: string }).error_code ??
61+
(error as Error).name ??
62+
"UNKNOWN";
63+
logger.warn(
64+
"Could not link experiment %s to %s (%s)",
65+
experimentId,
66+
tableName,
67+
code,
68+
);
69+
return null;
70+
}
71+
}
72+
73+
/**
74+
* Provision a UC trace storage location and link the experiment to it.
75+
*
76+
* If `warehouseId` is not provided, the function attempts to link directly
77+
* (works when the UC table already exists).
78+
*
79+
* Returns the fully-qualified UC table name on success, `null` otherwise.
80+
*/
81+
export async function setupExperimentTraceLocation(
82+
opts: TraceLocationOptions,
83+
): Promise<string | null> {
84+
const catalogName = opts.ucCatalog;
85+
const schemaName = opts.ucSchema;
86+
87+
if (!catalogName || !schemaName) {
88+
logger.debug(
89+
"Skipping trace location setup — catalog/schema not available (set OTEL_UC_TABLE_NAME as catalog.schema.table)",
90+
);
91+
return null;
92+
}
93+
94+
const warehouseId =
95+
opts.warehouseId ?? process.env.MLFLOW_TRACING_SQL_WAREHOUSE_ID;
96+
97+
let client: {
98+
apiClient: { request: (opts: unknown) => Promise<unknown> };
99+
config: { ensureResolved: () => Promise<void> };
100+
};
101+
try {
102+
const { WorkspaceClient } = await import("@databricks/sdk-experimental");
103+
client = new WorkspaceClient({}) as typeof client;
104+
await client.config.ensureResolved();
105+
} catch (err) {
106+
logger.warn(
107+
"Cannot set up trace location — Databricks auth unavailable: %O",
108+
err,
109+
);
110+
return null;
111+
}
112+
113+
if (!warehouseId) {
114+
const result = await linkExperimentToLocation(
115+
client,
116+
opts.experimentId,
117+
catalogName,
118+
schemaName,
119+
);
120+
if (!result) {
121+
logger.warn(
122+
"Trace destination does not exist and cannot be created — " +
123+
"set MLFLOW_TRACING_SQL_WAREHOUSE_ID to auto-create it",
124+
);
125+
}
126+
return result;
127+
}
128+
129+
try {
130+
logger.debug(
131+
"Creating UC trace location: %s.%s (warehouse=%s)",
132+
catalogName,
133+
schemaName,
134+
warehouseId,
135+
);
136+
137+
await client.apiClient.request({
138+
path: "/api/4.0/mlflow/traces/location",
139+
method: "POST",
140+
headers: new Headers({ "Content-Type": "application/json" }),
141+
payload: {
142+
uc_schema: {
143+
catalog_name: catalogName,
144+
schema_name: schemaName,
145+
},
146+
sql_warehouse_id: warehouseId,
147+
},
148+
raw: false,
149+
});
150+
151+
return linkExperimentToLocation(
152+
client,
153+
opts.experimentId,
154+
catalogName,
155+
schemaName,
156+
);
157+
} catch (error: unknown) {
158+
// 409 = location already exists — just link
159+
if (
160+
error instanceof Error &&
161+
(error.message?.includes("409") ||
162+
error.message?.includes("ALREADY_EXISTS"))
163+
) {
164+
return linkExperimentToLocation(
165+
client,
166+
opts.experimentId,
167+
catalogName,
168+
schemaName,
169+
);
170+
}
171+
logger.warn("Failed to create UC trace location: %O", error);
172+
return null;
173+
}
174+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* LangChain tracing integration for the agent plugin.
3+
*
4+
* Instruments LangChain callbacks via @arizeai/openinference-instrumentation-langchain
5+
* so that agent spans are emitted through the global tracer provider
6+
* (set up by AppKit's TelemetryManager).
7+
*
8+
* MLflow-specific headers (experiment ID, UC table name) are added
9+
* to the trace exporter by TelemetryManager — see buildTraceExporterHeaders().
10+
*/
11+
12+
import { createLogger } from "../../logging/logger";
13+
14+
const logger = createLogger("agent:tracing");
15+
16+
/**
17+
* Instrument LangChain callbacks via the Arize/OpenInference library.
18+
*
19+
* The instrumentation creates spans using the **global** tracer provider
20+
* (registered by TelemetryManager's NodeSDK). No separate provider is needed.
21+
*
22+
* IMPORTANT: `callbackManagerModule` must be the module object that the
23+
* agent runtime uses (imported from agent.ts context), NOT a separate
24+
* import inside this file. pnpm strict isolation can resolve different
25+
* physical copies of @langchain/core for different packages in the
26+
* dependency tree, and patching the wrong copy has no effect.
27+
*/
28+
export async function instrumentLangChain(
29+
callbackManagerModule?: typeof import("@langchain/core/callbacks/manager"),
30+
): Promise<void> {
31+
try {
32+
const { LangChainInstrumentation } = await import(
33+
"@arizeai/openinference-instrumentation-langchain"
34+
);
35+
36+
const cbModule =
37+
callbackManagerModule ??
38+
(await import("@langchain/core/callbacks/manager"));
39+
40+
const inst = new LangChainInstrumentation();
41+
inst.manuallyInstrument(cbModule);
42+
43+
logger.debug("LangChain callbacks instrumented (global tracer provider)");
44+
} catch (err) {
45+
logger.error("Failed to instrument LangChain callbacks: %O", err);
46+
}
47+
}

0 commit comments

Comments
 (0)