Skip to content

Commit a2dbfb1

Browse files
samikshya-dbclaude
andcommitted
Fix telemetry event listeners and add config options
- Fix event listener names: Remove 'telemetry.' prefix - Add support for telemetryBatchSize and telemetryAuthenticatedExport config options - Update telemetry files with fixed endpoints and proto format Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent e8c2033 commit a2dbfb1

File tree

5 files changed

+130
-48
lines changed

5 files changed

+130
-48
lines changed

lib/DBSQLClient.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,39 +254,39 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
254254
this.telemetryAggregator = new MetricsAggregator(this, exporter);
255255

256256
// Wire up event listeners
257-
this.telemetryEmitter.on('telemetry.connection.open', (event) => {
257+
this.telemetryEmitter.on('connection.open', (event) => {
258258
try {
259259
this.telemetryAggregator?.processEvent(event);
260260
} catch (error: any) {
261261
this.logger.log(LogLevel.debug, `Error processing connection.open event: ${error.message}`);
262262
}
263263
});
264264

265-
this.telemetryEmitter.on('telemetry.statement.start', (event) => {
265+
this.telemetryEmitter.on('statement.start', (event) => {
266266
try {
267267
this.telemetryAggregator?.processEvent(event);
268268
} catch (error: any) {
269269
this.logger.log(LogLevel.debug, `Error processing statement.start event: ${error.message}`);
270270
}
271271
});
272272

273-
this.telemetryEmitter.on('telemetry.statement.complete', (event) => {
273+
this.telemetryEmitter.on('statement.complete', (event) => {
274274
try {
275275
this.telemetryAggregator?.processEvent(event);
276276
} catch (error: any) {
277277
this.logger.log(LogLevel.debug, `Error processing statement.complete event: ${error.message}`);
278278
}
279279
});
280280

281-
this.telemetryEmitter.on('telemetry.cloudfetch.chunk', (event) => {
281+
this.telemetryEmitter.on('cloudfetch.chunk', (event) => {
282282
try {
283283
this.telemetryAggregator?.processEvent(event);
284284
} catch (error: any) {
285285
this.logger.log(LogLevel.debug, `Error processing cloudfetch.chunk event: ${error.message}`);
286286
}
287287
});
288288

289-
this.telemetryEmitter.on('telemetry.error', (event) => {
289+
this.telemetryEmitter.on('error', (event) => {
290290
try {
291291
this.telemetryAggregator?.processEvent(event);
292292
} catch (error: any) {
@@ -334,6 +334,12 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
334334
if (options.telemetryEnabled !== undefined) {
335335
this.config.telemetryEnabled = options.telemetryEnabled;
336336
}
337+
if (options.telemetryBatchSize !== undefined) {
338+
this.config.telemetryBatchSize = options.telemetryBatchSize;
339+
}
340+
if (options.telemetryAuthenticatedExport !== undefined) {
341+
this.config.telemetryAuthenticatedExport = options.telemetryAuthenticatedExport;
342+
}
337343

338344
this.authProvider = this.createAuthProvider(options, authProvider);
339345

lib/contracts/IDBSQLClient.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ export type ConnectionOptions = {
3434
socketTimeout?: number;
3535
proxy?: ProxyOptions;
3636
enableMetricViewMetadata?: boolean;
37-
// Optional telemetry override
37+
// Optional telemetry overrides
3838
telemetryEnabled?: boolean;
39+
telemetryBatchSize?: number;
40+
telemetryAuthenticatedExport?: boolean;
3941
} & AuthOptions;
4042

4143
export interface OpenSessionRequest {

lib/telemetry/DatabricksTelemetryExporter.ts

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { LogLevel } from '../contracts/IDBSQLLogger';
2020
import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types';
2121
import { CircuitBreakerRegistry } from './CircuitBreaker';
2222
import ExceptionClassifier from './ExceptionClassifier';
23+
import { buildUrl } from './urlUtils';
2324

2425
/**
2526
* Databricks telemetry log format for export.
@@ -37,19 +38,33 @@ interface DatabricksTelemetryLog {
3738
sql_driver_log: {
3839
session_id?: string;
3940
sql_statement_id?: string;
41+
system_configuration?: {
42+
driver_version?: string;
43+
runtime_name?: string;
44+
runtime_version?: string;
45+
runtime_vendor?: string;
46+
os_name?: string;
47+
os_version?: string;
48+
os_arch?: string;
49+
driver_name?: string;
50+
client_app_name?: string;
51+
};
52+
driver_connection_params?: any;
4053
operation_latency_ms?: number;
4154
sql_operation?: {
42-
execution_result_format?: string;
55+
execution_result?: string;
4356
chunk_details?: {
44-
chunk_count: number;
45-
total_bytes?: number;
57+
total_chunks_present?: number;
58+
total_chunks_iterated?: number;
59+
initial_chunk_latency_millis?: number;
60+
slowest_chunk_latency_millis?: number;
61+
sum_chunks_download_time_millis?: number;
4662
};
4763
};
4864
error_info?: {
4965
error_name: string;
5066
stack_trace: string;
5167
};
52-
driver_config?: any;
5368
};
5469
};
5570
}
@@ -190,8 +205,8 @@ export default class DatabricksTelemetryExporter {
190205
const authenticatedExport =
191206
config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport;
192207
const endpoint = authenticatedExport
193-
? `https://${this.host}/api/2.0/sql/telemetry-ext`
194-
: `https://${this.host}/api/2.0/sql/telemetry-unauth`;
208+
? buildUrl(this.host, '/telemetry-ext')
209+
: buildUrl(this.host, '/telemetry-unauth');
195210

196211
// Format payload
197212
const payload: DatabricksTelemetryPayload = {
@@ -206,7 +221,7 @@ export default class DatabricksTelemetryExporter {
206221
// Get authentication headers if using authenticated endpoint
207222
const authHeaders = authenticatedExport ? await this.context.getAuthHeaders() : {};
208223

209-
// Make HTTP POST request
224+
// Make HTTP POST request with authentication
210225
const response: Response = await this.fetchFn(endpoint, {
211226
method: 'POST',
212227
headers: {
@@ -231,7 +246,7 @@ export default class DatabricksTelemetryExporter {
231246
*/
232247
private toTelemetryLog(metric: TelemetryMetric): DatabricksTelemetryLog {
233248
const log: DatabricksTelemetryLog = {
234-
workspace_id: metric.workspaceId,
249+
// workspace_id: metric.workspaceId, // TODO: Determine if this should be numeric or omitted
235250
frontend_log_event_id: this.generateUUID(),
236251
context: {
237252
client_context: {
@@ -247,21 +262,29 @@ export default class DatabricksTelemetryExporter {
247262
},
248263
};
249264

250-
// Add metric-specific fields
265+
// Add metric-specific fields based on proto definition
251266
if (metric.metricType === 'connection' && metric.driverConfig) {
252-
log.entry.sql_driver_log.driver_config = metric.driverConfig;
267+
// Map driverConfig to system_configuration (snake_case as per proto)
268+
log.entry.sql_driver_log.system_configuration = {
269+
driver_version: metric.driverConfig.driverVersion,
270+
driver_name: metric.driverConfig.driverName,
271+
runtime_name: 'Node.js',
272+
runtime_version: metric.driverConfig.nodeVersion,
273+
os_name: metric.driverConfig.platform,
274+
os_version: metric.driverConfig.osVersion,
275+
};
253276
} else if (metric.metricType === 'statement') {
254277
log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs;
255278

256279
if (metric.resultFormat || metric.chunkCount) {
257280
log.entry.sql_driver_log.sql_operation = {
258-
execution_result_format: metric.resultFormat,
281+
execution_result: metric.resultFormat,
259282
};
260283

261284
if (metric.chunkCount && metric.chunkCount > 0) {
262285
log.entry.sql_driver_log.sql_operation.chunk_details = {
263-
chunk_count: metric.chunkCount,
264-
total_bytes: metric.bytesDownloaded,
286+
total_chunks_present: metric.chunkCount,
287+
total_chunks_iterated: metric.chunkCount,
265288
};
266289
}
267290
}

lib/telemetry/FeatureFlagCache.ts

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
import fetch from 'node-fetch';
1817
import IClientContext from '../contracts/IClientContext';
1918
import { LogLevel } from '../contracts/IDBSQLLogger';
19+
import fetch from 'node-fetch';
20+
import { buildUrl } from './urlUtils';
2021

2122
/**
2223
* Context holding feature flag state for a specific host.
@@ -105,35 +106,28 @@ export default class FeatureFlagCache {
105106
}
106107

107108
/**
108-
* Gets the driver version from package.json.
109-
* Used for version-specific feature flag requests.
110-
*/
111-
private getDriverVersion(): string {
112-
try {
113-
// eslint-disable-next-line @typescript-eslint/no-var-requires
114-
const packageJson = require('../../package.json');
115-
return packageJson.version || 'unknown';
116-
} catch {
117-
return 'unknown';
118-
}
119-
}
120-
121-
/**
122-
* Fetches feature flag from server REST API.
123-
* Makes authenticated call to connector-service endpoint.
109+
* Fetches feature flag from server using connector-service API.
110+
* Calls GET /api/2.0/connector-service/feature-flags/OSS_NODEJS/{version}
111+
*
124112
* @param host The host to fetch feature flag for
113+
* @returns true if feature flag is enabled, false otherwise
125114
*/
126115
private async fetchFeatureFlag(host: string): Promise<boolean> {
127116
const logger = this.context.getLogger();
117+
128118
try {
119+
// Get driver version for endpoint
129120
const driverVersion = this.getDriverVersion();
130-
const endpoint = `https://${host}/api/2.0/connector-service/feature-flags/OSS_NODEJS/${driverVersion}`;
121+
122+
// Build feature flags endpoint for Node.js driver
123+
const endpoint = buildUrl(host, `/api/2.0/connector-service/feature-flags/NODEJS/${driverVersion}`);
131124

132125
// Get authentication headers
133126
const authHeaders = await this.context.getAuthHeaders();
134127

135-
logger.log(LogLevel.debug, `Fetching feature flag from ${endpoint}`);
128+
logger.log(LogLevel.debug, `Fetching feature flags from ${endpoint}`);
136129

130+
// Make HTTP GET request with authentication
137131
const response = await fetch(endpoint, {
138132
method: 'GET',
139133
headers: {
@@ -144,36 +138,63 @@ export default class FeatureFlagCache {
144138
});
145139

146140
if (!response.ok) {
147-
logger.log(LogLevel.debug, `Feature flag fetch returned status ${response.status}`);
141+
logger.log(
142+
LogLevel.debug,
143+
`Feature flag fetch failed: ${response.status} ${response.statusText}`
144+
);
148145
return false;
149146
}
150147

148+
// Parse response JSON
151149
const data: any = await response.json();
152150

153-
// Update cache duration from ttl_seconds if provided
154-
if (data && data.ttl_seconds) {
151+
// Response format: { flags: [{ name: string, value: string }], ttl_seconds?: number }
152+
if (data && data.flags && Array.isArray(data.flags)) {
153+
// Update cache duration if TTL provided
155154
const ctx = this.contexts.get(host);
156-
if (ctx) {
157-
ctx.cacheDuration = data.ttl_seconds * 1000;
155+
if (ctx && data.ttl_seconds) {
156+
ctx.cacheDuration = data.ttl_seconds * 1000; // Convert to milliseconds
158157
logger.log(LogLevel.debug, `Updated cache duration to ${data.ttl_seconds} seconds`);
159158
}
160-
}
161159

162-
// Find the telemetry flag
163-
if (data && data.flags && Array.isArray(data.flags)) {
160+
// Look for our specific feature flag
164161
const flag = data.flags.find((f: any) => f.name === this.FEATURE_FLAG_NAME);
162+
165163
if (flag) {
166-
const enabled = String(flag.value).toLowerCase() === 'true';
167-
logger.log(LogLevel.debug, `Feature flag ${this.FEATURE_FLAG_NAME} = ${enabled}`);
164+
// Parse boolean value (can be string "true"/"false")
165+
const value = String(flag.value).toLowerCase();
166+
const enabled = value === 'true';
167+
logger.log(
168+
LogLevel.debug,
169+
`Feature flag ${this.FEATURE_FLAG_NAME}: ${enabled}`
170+
);
168171
return enabled;
169172
}
170173
}
171174

175+
// Feature flag not found in response, default to false
172176
logger.log(LogLevel.debug, `Feature flag ${this.FEATURE_FLAG_NAME} not found in response`);
173177
return false;
174178
} catch (error: any) {
179+
// Log at debug level only, never propagate exceptions
175180
logger.log(LogLevel.debug, `Error fetching feature flag from ${host}: ${error.message}`);
176181
return false;
177182
}
178183
}
184+
185+
/**
186+
* Gets the driver version without -oss suffix for API calls.
187+
* Format: "1.12.0" from "1.12.0-oss"
188+
*/
189+
private getDriverVersion(): string {
190+
try {
191+
// Import version from lib/version.ts
192+
const version = require('../version').default;
193+
// Remove -oss suffix if present
194+
return version.replace(/-oss$/, '');
195+
} catch (error) {
196+
// Fallback to a default version if import fails
197+
return '1.0.0';
198+
}
199+
}
179200
}

lib/telemetry/urlUtils.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright (c) 2025 Databricks Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Build full URL from host and path, handling protocol correctly.
19+
* @param host The hostname (with or without protocol)
20+
* @param path The path to append (should start with /)
21+
* @returns Full URL with protocol
22+
*/
23+
export function buildUrl(host: string, path: string): string {
24+
// Check if host already has protocol
25+
if (host.startsWith('http://') || host.startsWith('https://')) {
26+
return `${host}${path}`;
27+
}
28+
// Add https:// if no protocol present
29+
return `https://${host}${path}`;
30+
}

0 commit comments

Comments
 (0)