|
| 1 | +/** |
| 2 | + * Copyright (c) 2025 Databricks Contributors |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +import fetch, { Response } from 'node-fetch'; |
| 18 | +import IClientContext from '../contracts/IClientContext'; |
| 19 | +import { LogLevel } from '../contracts/IDBSQLLogger'; |
| 20 | +import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types'; |
| 21 | +import { CircuitBreakerRegistry } from './CircuitBreaker'; |
| 22 | +import ExceptionClassifier from './ExceptionClassifier'; |
| 23 | +import { buildUrl } from './urlUtils'; |
| 24 | + |
| 25 | +/** |
| 26 | + * Databricks telemetry log format for export. |
| 27 | + */ |
| 28 | +interface DatabricksTelemetryLog { |
| 29 | + workspace_id?: string; |
| 30 | + frontend_log_event_id: string; |
| 31 | + context: { |
| 32 | + client_context: { |
| 33 | + timestamp_millis: number; |
| 34 | + user_agent: string; |
| 35 | + }; |
| 36 | + }; |
| 37 | + entry: { |
| 38 | + sql_driver_log: { |
| 39 | + session_id?: string; |
| 40 | + sql_statement_id?: string; |
| 41 | + system_configuration?: { |
| 42 | + driver_version?: string; |
| 43 | + runtime_name?: string; |
| 44 | + runtime_version?: string; |
| 45 | + runtime_vendor?: string; |
| 46 | + os_name?: string; |
| 47 | + os_version?: string; |
| 48 | + os_arch?: string; |
| 49 | + driver_name?: string; |
| 50 | + client_app_name?: string; |
| 51 | + }; |
| 52 | + driver_connection_params?: any; |
| 53 | + operation_latency_ms?: number; |
| 54 | + sql_operation?: { |
| 55 | + execution_result?: string; |
| 56 | + chunk_details?: { |
| 57 | + total_chunks_present?: number; |
| 58 | + total_chunks_iterated?: number; |
| 59 | + initial_chunk_latency_millis?: number; |
| 60 | + slowest_chunk_latency_millis?: number; |
| 61 | + sum_chunks_download_time_millis?: number; |
| 62 | + }; |
| 63 | + }; |
| 64 | + error_info?: { |
| 65 | + error_name: string; |
| 66 | + stack_trace: string; |
| 67 | + }; |
| 68 | + }; |
| 69 | + }; |
| 70 | +} |
| 71 | + |
| 72 | +/** |
| 73 | + * Payload format for Databricks telemetry export. |
| 74 | + */ |
| 75 | +interface DatabricksTelemetryPayload { |
| 76 | + frontend_logs: DatabricksTelemetryLog[]; |
| 77 | +} |
| 78 | + |
| 79 | +/** |
| 80 | + * Exports telemetry metrics to Databricks telemetry service. |
| 81 | + * |
| 82 | + * Endpoints: |
| 83 | + * - Authenticated: /api/2.0/sql/telemetry-ext |
| 84 | + * - Unauthenticated: /api/2.0/sql/telemetry-unauth |
| 85 | + * |
| 86 | + * Features: |
| 87 | + * - Circuit breaker integration for endpoint protection |
| 88 | + * - Retry logic with exponential backoff for retryable errors |
| 89 | + * - Terminal error detection (no retry on 400, 401, 403, 404) |
| 90 | + * - CRITICAL: export() method NEVER throws - all exceptions swallowed |
| 91 | + * - CRITICAL: All logging at LogLevel.debug ONLY |
| 92 | + */ |
| 93 | +export default class DatabricksTelemetryExporter { |
| 94 | + private circuitBreaker; |
| 95 | + |
| 96 | + private readonly userAgent: string; |
| 97 | + |
| 98 | + private fetchFn: typeof fetch; |
| 99 | + |
| 100 | + constructor( |
| 101 | + private context: IClientContext, |
| 102 | + private host: string, |
| 103 | + private circuitBreakerRegistry: CircuitBreakerRegistry, |
| 104 | + fetchFunction?: typeof fetch |
| 105 | + ) { |
| 106 | + this.circuitBreaker = circuitBreakerRegistry.getCircuitBreaker(host); |
| 107 | + this.fetchFn = fetchFunction || fetch; |
| 108 | + |
| 109 | + // Get driver version for user agent |
| 110 | + this.userAgent = `databricks-sql-nodejs/${this.getDriverVersion()}`; |
| 111 | + } |
| 112 | + |
| 113 | + /** |
| 114 | + * Export metrics to Databricks service. Never throws. |
| 115 | + * |
| 116 | + * @param metrics - Array of telemetry metrics to export |
| 117 | + */ |
| 118 | + async export(metrics: TelemetryMetric[]): Promise<void> { |
| 119 | + if (!metrics || metrics.length === 0) { |
| 120 | + return; |
| 121 | + } |
| 122 | + |
| 123 | + const logger = this.context.getLogger(); |
| 124 | + |
| 125 | + try { |
| 126 | + await this.circuitBreaker.execute(async () => { |
| 127 | + await this.exportWithRetry(metrics); |
| 128 | + }); |
| 129 | + } catch (error: any) { |
| 130 | + // CRITICAL: All exceptions swallowed and logged at debug level ONLY |
| 131 | + if (error.message === 'Circuit breaker OPEN') { |
| 132 | + logger.log(LogLevel.debug, 'Circuit breaker OPEN - dropping telemetry'); |
| 133 | + } else { |
| 134 | + logger.log(LogLevel.debug, `Telemetry export error: ${error.message}`); |
| 135 | + } |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + /** |
| 140 | + * Export metrics with retry logic for retryable errors. |
| 141 | + * Implements exponential backoff with jitter. |
| 142 | + */ |
| 143 | + private async exportWithRetry(metrics: TelemetryMetric[]): Promise<void> { |
| 144 | + const config = this.context.getConfig(); |
| 145 | + const logger = this.context.getLogger(); |
| 146 | + const maxRetries = config.telemetryMaxRetries ?? DEFAULT_TELEMETRY_CONFIG.maxRetries; |
| 147 | + |
| 148 | + let lastError: Error | null = null; |
| 149 | + |
| 150 | + /* eslint-disable no-await-in-loop */ |
| 151 | + for (let attempt = 0; attempt <= maxRetries; attempt += 1) { |
| 152 | + try { |
| 153 | + await this.exportInternal(metrics); |
| 154 | + return; // Success |
| 155 | + } catch (error: any) { |
| 156 | + lastError = error; |
| 157 | + |
| 158 | + // Check if error is terminal (don't retry) |
| 159 | + if (ExceptionClassifier.isTerminal(error)) { |
| 160 | + logger.log(LogLevel.debug, `Terminal error - no retry: ${error.message}`); |
| 161 | + throw error; // Terminal error, propagate to circuit breaker |
| 162 | + } |
| 163 | + |
| 164 | + // Check if error is retryable |
| 165 | + if (!ExceptionClassifier.isRetryable(error)) { |
| 166 | + logger.log(LogLevel.debug, `Non-retryable error: ${error.message}`); |
| 167 | + throw error; // Not retryable, propagate to circuit breaker |
| 168 | + } |
| 169 | + |
| 170 | + // Last attempt reached |
| 171 | + if (attempt >= maxRetries) { |
| 172 | + logger.log(LogLevel.debug, `Max retries reached (${maxRetries}): ${error.message}`); |
| 173 | + throw error; // Max retries exhausted, propagate to circuit breaker |
| 174 | + } |
| 175 | + |
| 176 | + // Calculate backoff with exponential + jitter (100ms - 1000ms) |
| 177 | + const baseDelay = Math.min(100 * 2**attempt, 1000); |
| 178 | + const jitter = Math.random() * 100; |
| 179 | + const delay = baseDelay + jitter; |
| 180 | + |
| 181 | + logger.log( |
| 182 | + LogLevel.debug, |
| 183 | + `Retrying telemetry export (attempt ${attempt + 1}/${maxRetries}) after ${Math.round(delay)}ms` |
| 184 | + ); |
| 185 | + |
| 186 | + await this.sleep(delay); |
| 187 | + } |
| 188 | + } |
| 189 | + /* eslint-enable no-await-in-loop */ |
| 190 | + |
| 191 | + // Should not reach here, but just in case |
| 192 | + if (lastError) { |
| 193 | + throw lastError; |
| 194 | + } |
| 195 | + } |
| 196 | + |
| 197 | + /** |
| 198 | + * Internal export implementation that makes the HTTP call. |
| 199 | + */ |
| 200 | + private async exportInternal(metrics: TelemetryMetric[]): Promise<void> { |
| 201 | + const config = this.context.getConfig(); |
| 202 | + const logger = this.context.getLogger(); |
| 203 | + |
| 204 | + // Determine endpoint based on authentication mode |
| 205 | + const authenticatedExport = |
| 206 | + config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport; |
| 207 | + const endpoint = authenticatedExport |
| 208 | + ? buildUrl(this.host, '/telemetry-ext') |
| 209 | + : buildUrl(this.host, '/telemetry-unauth'); |
| 210 | + |
| 211 | + // Format payload |
| 212 | + const payload: DatabricksTelemetryPayload = { |
| 213 | + frontend_logs: metrics.map((m) => this.toTelemetryLog(m)), |
| 214 | + }; |
| 215 | + |
| 216 | + logger.log( |
| 217 | + LogLevel.debug, |
| 218 | + `Exporting ${metrics.length} telemetry metrics to ${authenticatedExport ? 'authenticated' : 'unauthenticated'} endpoint` |
| 219 | + ); |
| 220 | + |
| 221 | + // Get authentication headers if using authenticated endpoint |
| 222 | + const authHeaders = authenticatedExport ? await this.context.getAuthHeaders() : {}; |
| 223 | + |
| 224 | + // Make HTTP POST request with authentication |
| 225 | + const response: Response = await this.fetchFn(endpoint, { |
| 226 | + method: 'POST', |
| 227 | + headers: { |
| 228 | + ...authHeaders, |
| 229 | + 'Content-Type': 'application/json', |
| 230 | + 'User-Agent': this.userAgent, |
| 231 | + }, |
| 232 | + body: JSON.stringify(payload), |
| 233 | + }); |
| 234 | + |
| 235 | + if (!response.ok) { |
| 236 | + const error: any = new Error(`Telemetry export failed: ${response.status} ${response.statusText}`); |
| 237 | + error.statusCode = response.status; |
| 238 | + throw error; |
| 239 | + } |
| 240 | + |
| 241 | + logger.log(LogLevel.debug, `Successfully exported ${metrics.length} telemetry metrics`); |
| 242 | + } |
| 243 | + |
| 244 | + /** |
| 245 | + * Convert TelemetryMetric to Databricks telemetry log format. |
| 246 | + */ |
| 247 | + private toTelemetryLog(metric: TelemetryMetric): DatabricksTelemetryLog { |
| 248 | + const log: DatabricksTelemetryLog = { |
| 249 | + // workspace_id: metric.workspaceId, // TODO: Determine if this should be numeric or omitted |
| 250 | + frontend_log_event_id: this.generateUUID(), |
| 251 | + context: { |
| 252 | + client_context: { |
| 253 | + timestamp_millis: metric.timestamp, |
| 254 | + user_agent: this.userAgent, |
| 255 | + }, |
| 256 | + }, |
| 257 | + entry: { |
| 258 | + sql_driver_log: { |
| 259 | + session_id: metric.sessionId, |
| 260 | + sql_statement_id: metric.statementId, |
| 261 | + }, |
| 262 | + }, |
| 263 | + }; |
| 264 | + |
| 265 | + // Add metric-specific fields based on proto definition |
| 266 | + if (metric.metricType === 'connection' && metric.driverConfig) { |
| 267 | + // Map driverConfig to system_configuration (snake_case as per proto) |
| 268 | + log.entry.sql_driver_log.system_configuration = { |
| 269 | + driver_version: metric.driverConfig.driverVersion, |
| 270 | + driver_name: metric.driverConfig.driverName, |
| 271 | + runtime_name: 'Node.js', |
| 272 | + runtime_version: metric.driverConfig.nodeVersion, |
| 273 | + os_name: metric.driverConfig.platform, |
| 274 | + os_version: metric.driverConfig.osVersion, |
| 275 | + }; |
| 276 | + } else if (metric.metricType === 'statement') { |
| 277 | + log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs; |
| 278 | + |
| 279 | + if (metric.resultFormat || metric.chunkCount) { |
| 280 | + log.entry.sql_driver_log.sql_operation = { |
| 281 | + execution_result: metric.resultFormat, |
| 282 | + }; |
| 283 | + |
| 284 | + if (metric.chunkCount && metric.chunkCount > 0) { |
| 285 | + log.entry.sql_driver_log.sql_operation.chunk_details = { |
| 286 | + total_chunks_present: metric.chunkCount, |
| 287 | + total_chunks_iterated: metric.chunkCount, |
| 288 | + }; |
| 289 | + } |
| 290 | + } |
| 291 | + } else if (metric.metricType === 'error') { |
| 292 | + log.entry.sql_driver_log.error_info = { |
| 293 | + error_name: metric.errorName || 'UnknownError', |
| 294 | + stack_trace: metric.errorMessage || '', |
| 295 | + }; |
| 296 | + } |
| 297 | + |
| 298 | + return log; |
| 299 | + } |
| 300 | + |
| 301 | + /** |
| 302 | + * Generate a UUID v4. |
| 303 | + */ |
| 304 | + private generateUUID(): string { |
| 305 | + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => { |
| 306 | + const r = (Math.random() * 16) | 0; |
| 307 | + const v = c === 'x' ? r : (r & 0x3) | 0x8; |
| 308 | + return v.toString(16); |
| 309 | + }); |
| 310 | + } |
| 311 | + |
| 312 | + /** |
| 313 | + * Get driver version from package.json. |
| 314 | + */ |
| 315 | + private getDriverVersion(): string { |
| 316 | + try { |
| 317 | + // In production, this would read from package.json |
| 318 | + return '1.0.0'; |
| 319 | + } catch { |
| 320 | + return 'unknown'; |
| 321 | + } |
| 322 | + } |
| 323 | + |
| 324 | + /** |
| 325 | + * Sleep for the specified number of milliseconds. |
| 326 | + */ |
| 327 | + private sleep(ms: number): Promise<void> { |
| 328 | + return new Promise((resolve) => { |
| 329 | + setTimeout(resolve, ms); |
| 330 | + }); |
| 331 | + } |
| 332 | +} |
0 commit comments