Skip to content

Commit 5be9cb7

Browse files
committed
Fix feature flag and telemetry export endpoints
- Change feature flag endpoint to use NODEJS client type - Fix telemetry endpoints to /telemetry-ext and /telemetry-unauth - Update payload to match proto with system_configuration - Add shared buildUrl utility for protocol handling
1 parent 21e3087 commit 5be9cb7

File tree

3 files changed

+412
-29
lines changed

3 files changed

+412
-29
lines changed
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/**
2+
* Copyright (c) 2025 Databricks Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import fetch, { Response } from 'node-fetch';
18+
import IClientContext from '../contracts/IClientContext';
19+
import { LogLevel } from '../contracts/IDBSQLLogger';
20+
import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types';
21+
import { CircuitBreakerRegistry } from './CircuitBreaker';
22+
import ExceptionClassifier from './ExceptionClassifier';
23+
import { buildUrl } from './urlUtils';
24+
25+
/**
26+
* Databricks telemetry log format for export.
27+
*/
28+
interface DatabricksTelemetryLog {
29+
workspace_id?: string;
30+
frontend_log_event_id: string;
31+
context: {
32+
client_context: {
33+
timestamp_millis: number;
34+
user_agent: string;
35+
};
36+
};
37+
entry: {
38+
sql_driver_log: {
39+
session_id?: string;
40+
sql_statement_id?: string;
41+
system_configuration?: {
42+
driver_version?: string;
43+
runtime_name?: string;
44+
runtime_version?: string;
45+
runtime_vendor?: string;
46+
os_name?: string;
47+
os_version?: string;
48+
os_arch?: string;
49+
driver_name?: string;
50+
client_app_name?: string;
51+
};
52+
driver_connection_params?: any;
53+
operation_latency_ms?: number;
54+
sql_operation?: {
55+
execution_result?: string;
56+
chunk_details?: {
57+
total_chunks_present?: number;
58+
total_chunks_iterated?: number;
59+
initial_chunk_latency_millis?: number;
60+
slowest_chunk_latency_millis?: number;
61+
sum_chunks_download_time_millis?: number;
62+
};
63+
};
64+
error_info?: {
65+
error_name: string;
66+
stack_trace: string;
67+
};
68+
};
69+
};
70+
}
71+
72+
/**
73+
* Payload format for Databricks telemetry export.
74+
*/
75+
interface DatabricksTelemetryPayload {
76+
frontend_logs: DatabricksTelemetryLog[];
77+
}
78+
79+
/**
80+
* Exports telemetry metrics to Databricks telemetry service.
81+
*
82+
* Endpoints:
83+
* - Authenticated: /api/2.0/sql/telemetry-ext
84+
* - Unauthenticated: /api/2.0/sql/telemetry-unauth
85+
*
86+
* Features:
87+
* - Circuit breaker integration for endpoint protection
88+
* - Retry logic with exponential backoff for retryable errors
89+
* - Terminal error detection (no retry on 400, 401, 403, 404)
90+
* - CRITICAL: export() method NEVER throws - all exceptions swallowed
91+
* - CRITICAL: All logging at LogLevel.debug ONLY
92+
*/
93+
export default class DatabricksTelemetryExporter {
94+
private circuitBreaker;
95+
96+
private readonly userAgent: string;
97+
98+
private fetchFn: typeof fetch;
99+
100+
constructor(
101+
private context: IClientContext,
102+
private host: string,
103+
private circuitBreakerRegistry: CircuitBreakerRegistry,
104+
fetchFunction?: typeof fetch
105+
) {
106+
this.circuitBreaker = circuitBreakerRegistry.getCircuitBreaker(host);
107+
this.fetchFn = fetchFunction || fetch;
108+
109+
// Get driver version for user agent
110+
this.userAgent = `databricks-sql-nodejs/${this.getDriverVersion()}`;
111+
}
112+
113+
/**
114+
* Export metrics to Databricks service. Never throws.
115+
*
116+
* @param metrics - Array of telemetry metrics to export
117+
*/
118+
async export(metrics: TelemetryMetric[]): Promise<void> {
119+
if (!metrics || metrics.length === 0) {
120+
return;
121+
}
122+
123+
const logger = this.context.getLogger();
124+
125+
try {
126+
await this.circuitBreaker.execute(async () => {
127+
await this.exportWithRetry(metrics);
128+
});
129+
} catch (error: any) {
130+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
131+
if (error.message === 'Circuit breaker OPEN') {
132+
logger.log(LogLevel.debug, 'Circuit breaker OPEN - dropping telemetry');
133+
} else {
134+
logger.log(LogLevel.debug, `Telemetry export error: ${error.message}`);
135+
}
136+
}
137+
}
138+
139+
/**
140+
* Export metrics with retry logic for retryable errors.
141+
* Implements exponential backoff with jitter.
142+
*/
143+
private async exportWithRetry(metrics: TelemetryMetric[]): Promise<void> {
144+
const config = this.context.getConfig();
145+
const logger = this.context.getLogger();
146+
const maxRetries = config.telemetryMaxRetries ?? DEFAULT_TELEMETRY_CONFIG.maxRetries;
147+
148+
let lastError: Error | null = null;
149+
150+
/* eslint-disable no-await-in-loop */
151+
for (let attempt = 0; attempt <= maxRetries; attempt += 1) {
152+
try {
153+
await this.exportInternal(metrics);
154+
return; // Success
155+
} catch (error: any) {
156+
lastError = error;
157+
158+
// Check if error is terminal (don't retry)
159+
if (ExceptionClassifier.isTerminal(error)) {
160+
logger.log(LogLevel.debug, `Terminal error - no retry: ${error.message}`);
161+
throw error; // Terminal error, propagate to circuit breaker
162+
}
163+
164+
// Check if error is retryable
165+
if (!ExceptionClassifier.isRetryable(error)) {
166+
logger.log(LogLevel.debug, `Non-retryable error: ${error.message}`);
167+
throw error; // Not retryable, propagate to circuit breaker
168+
}
169+
170+
// Last attempt reached
171+
if (attempt >= maxRetries) {
172+
logger.log(LogLevel.debug, `Max retries reached (${maxRetries}): ${error.message}`);
173+
throw error; // Max retries exhausted, propagate to circuit breaker
174+
}
175+
176+
// Calculate backoff with exponential + jitter (100ms - 1000ms)
177+
const baseDelay = Math.min(100 * 2**attempt, 1000);
178+
const jitter = Math.random() * 100;
179+
const delay = baseDelay + jitter;
180+
181+
logger.log(
182+
LogLevel.debug,
183+
`Retrying telemetry export (attempt ${attempt + 1}/${maxRetries}) after ${Math.round(delay)}ms`
184+
);
185+
186+
await this.sleep(delay);
187+
}
188+
}
189+
/* eslint-enable no-await-in-loop */
190+
191+
// Should not reach here, but just in case
192+
if (lastError) {
193+
throw lastError;
194+
}
195+
}
196+
197+
/**
198+
* Internal export implementation that makes the HTTP call.
199+
*/
200+
private async exportInternal(metrics: TelemetryMetric[]): Promise<void> {
201+
const config = this.context.getConfig();
202+
const logger = this.context.getLogger();
203+
204+
// Determine endpoint based on authentication mode
205+
const authenticatedExport =
206+
config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport;
207+
const endpoint = authenticatedExport
208+
? buildUrl(this.host, '/telemetry-ext')
209+
: buildUrl(this.host, '/telemetry-unauth');
210+
211+
// Format payload
212+
const payload: DatabricksTelemetryPayload = {
213+
frontend_logs: metrics.map((m) => this.toTelemetryLog(m)),
214+
};
215+
216+
logger.log(
217+
LogLevel.debug,
218+
`Exporting ${metrics.length} telemetry metrics to ${authenticatedExport ? 'authenticated' : 'unauthenticated'} endpoint`
219+
);
220+
221+
// Get authentication headers if using authenticated endpoint
222+
const authHeaders = authenticatedExport ? await this.context.getAuthHeaders() : {};
223+
224+
// Make HTTP POST request with authentication
225+
const response: Response = await this.fetchFn(endpoint, {
226+
method: 'POST',
227+
headers: {
228+
...authHeaders,
229+
'Content-Type': 'application/json',
230+
'User-Agent': this.userAgent,
231+
},
232+
body: JSON.stringify(payload),
233+
});
234+
235+
if (!response.ok) {
236+
const error: any = new Error(`Telemetry export failed: ${response.status} ${response.statusText}`);
237+
error.statusCode = response.status;
238+
throw error;
239+
}
240+
241+
logger.log(LogLevel.debug, `Successfully exported ${metrics.length} telemetry metrics`);
242+
}
243+
244+
/**
245+
* Convert TelemetryMetric to Databricks telemetry log format.
246+
*/
247+
private toTelemetryLog(metric: TelemetryMetric): DatabricksTelemetryLog {
248+
const log: DatabricksTelemetryLog = {
249+
// workspace_id: metric.workspaceId, // TODO: Determine if this should be numeric or omitted
250+
frontend_log_event_id: this.generateUUID(),
251+
context: {
252+
client_context: {
253+
timestamp_millis: metric.timestamp,
254+
user_agent: this.userAgent,
255+
},
256+
},
257+
entry: {
258+
sql_driver_log: {
259+
session_id: metric.sessionId,
260+
sql_statement_id: metric.statementId,
261+
},
262+
},
263+
};
264+
265+
// Add metric-specific fields based on proto definition
266+
if (metric.metricType === 'connection' && metric.driverConfig) {
267+
// Map driverConfig to system_configuration (snake_case as per proto)
268+
log.entry.sql_driver_log.system_configuration = {
269+
driver_version: metric.driverConfig.driverVersion,
270+
driver_name: metric.driverConfig.driverName,
271+
runtime_name: 'Node.js',
272+
runtime_version: metric.driverConfig.nodeVersion,
273+
os_name: metric.driverConfig.platform,
274+
os_version: metric.driverConfig.osVersion,
275+
};
276+
} else if (metric.metricType === 'statement') {
277+
log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs;
278+
279+
if (metric.resultFormat || metric.chunkCount) {
280+
log.entry.sql_driver_log.sql_operation = {
281+
execution_result: metric.resultFormat,
282+
};
283+
284+
if (metric.chunkCount && metric.chunkCount > 0) {
285+
log.entry.sql_driver_log.sql_operation.chunk_details = {
286+
total_chunks_present: metric.chunkCount,
287+
total_chunks_iterated: metric.chunkCount,
288+
};
289+
}
290+
}
291+
} else if (metric.metricType === 'error') {
292+
log.entry.sql_driver_log.error_info = {
293+
error_name: metric.errorName || 'UnknownError',
294+
stack_trace: metric.errorMessage || '',
295+
};
296+
}
297+
298+
return log;
299+
}
300+
301+
/**
302+
* Generate a UUID v4.
303+
*/
304+
private generateUUID(): string {
305+
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
306+
const r = (Math.random() * 16) | 0;
307+
const v = c === 'x' ? r : (r & 0x3) | 0x8;
308+
return v.toString(16);
309+
});
310+
}
311+
312+
/**
313+
* Get driver version from package.json.
314+
*/
315+
private getDriverVersion(): string {
316+
try {
317+
// In production, this would read from package.json
318+
return '1.0.0';
319+
} catch {
320+
return 'unknown';
321+
}
322+
}
323+
324+
/**
325+
* Sleep for the specified number of milliseconds.
326+
*/
327+
private sleep(ms: number): Promise<void> {
328+
return new Promise((resolve) => {
329+
setTimeout(resolve, ms);
330+
});
331+
}
332+
}

0 commit comments

Comments
 (0)