From 4e154607c2b80586e915a36bd2592b67d129bbab Mon Sep 17 00:00:00 2001 From: ditadi Date: Mon, 11 May 2026 17:07:29 +0100 Subject: [PATCH] refactor(appkit): split SQLWarehouseConnector into submit/get/poll/transform MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `SQLClient.executeStatement` was a single block: submit the SQL, poll until terminal, transform the Arrow payload to JSON. Splitting it into four narrower public APIs lets durable executors compose them without holding the orchestrator open across the wait: - `submitStatement(sql, params, opts)` — POST `/sql/statements`, returns the raw initial response. Adds a dedicated `sql.submit` span. - `getStatement(id)` — GET `/sql/statements/{id}`, single status read. - `pollStatement(id, opts)` — block until the statement reaches a terminal state (SUCCEEDED / FAILED / CANCELED / CLOSED), respecting the same timeout, signal, and error semantics the old monolithic method had. - `transformResult(response)` — Arrow → JSON row transform, no I/O. `executeStatement(...)` is preserved and now composes the four publics (`submit` → `poll` → `transform`). No private wrapper-only helpers remain. Every error path, abort branch, and status state machine of the old method is exercised by the new per-API test suites (21 new tests against `submit` / `get` / `poll` / `transform`). Motivation (documented inline in JSDoc): durable callers — e.g. a future TaskFlow-based analytics handler — emit a `statement_submitted` event with the warehouse-side statement ID right after `submitStatement` returns, so on crash recovery they can re-attach via `pollStatement` without re-running the SQL. The TaskFlow integration itself is not in this PR. `executeStatement`'s contract is unchanged; analytics (the only external caller) keeps working without modification. The added `sql.submit` span is purely additive for OTLP collectors. Verified: pnpm -r typecheck, pnpm build, full pnpm test (122 files, 2276 tests) all green. Signed-off-by: ditadi --- .../src/connectors/sql-warehouse/client.ts | 470 +++++++++++------- .../connectors/tests/sql-warehouse.test.ts | 352 +++++++++++-- 2 files changed, 601 insertions(+), 221 deletions(-) diff --git a/packages/appkit/src/connectors/sql-warehouse/client.ts b/packages/appkit/src/connectors/sql-warehouse/client.ts index d0a1c1816..78d333d10 100644 --- a/packages/appkit/src/connectors/sql-warehouse/client.ts +++ b/packages/appkit/src/connectors/sql-warehouse/client.ts @@ -25,6 +25,31 @@ import { executeStatementDefaults } from "./defaults"; const logger = createLogger("connectors:sql-warehouse"); +/** + * Sleep for `ms`, resolving early if `signal` aborts. Caller must + * re-check `signal.aborted` afterwards to surface a typed cancel error. + * @internal + */ +function sleepWithSignal(ms: number, signal?: AbortSignal): Promise { + if (ms <= 0) return Promise.resolve(); + return new Promise((resolve) => { + const handle = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }, ms); + const onAbort = () => { + clearTimeout(handle); + resolve(); + }; + if (signal?.aborted) { + clearTimeout(handle); + resolve(); + return; + } + signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + interface SQLWarehouseConfig { timeout?: number; telemetry?: TelemetryOptions; @@ -81,209 +106,117 @@ export class SQLWarehouseConnector { return this._arrowProcessor; } - async executeStatement( + /** + * Submit a statement and return the raw initial response. Caller polls + * via {@link pollStatement} on `RUNNING`/`PENDING`. Split from polling + * so durable executors can checkpoint the warehouse-side statement ID + * and re-attach after a crash without re-running the SQL. + */ + async submitStatement( workspaceClient: WorkspaceClient, input: sql.ExecuteStatementRequest, signal?: AbortSignal, - ) { - const startTime = Date.now(); - let success = false; - - // if signal is aborted, throw an error + ): Promise { if (signal?.aborted) { throw ExecutionError.canceled(); } + if (!input.statement) { + throw ValidationError.missingField("statement"); + } + if (!input.warehouse_id) { + throw ValidationError.missingField("warehouse_id"); + } + + const body: sql.ExecuteStatementRequest = { + statement: input.statement, + parameters: input.parameters, + warehouse_id: input.warehouse_id, + catalog: input.catalog, + schema: input.schema, + wait_timeout: input.wait_timeout || executeStatementDefaults.wait_timeout, + disposition: input.disposition || executeStatementDefaults.disposition, + format: input.format || executeStatementDefaults.format, + byte_limit: input.byte_limit, + row_limit: input.row_limit, + on_wait_timeout: + input.on_wait_timeout || executeStatementDefaults.on_wait_timeout, + }; return this.telemetry.startActiveSpan( - "sql.query", + "sql.submit", { kind: SpanKind.CLIENT, attributes: { "db.system": "databricks", - "db.warehouse_id": input.warehouse_id || "", - "db.catalog": input.catalog ?? "", - "db.schema": input.schema ?? "", - "db.statement": input.statement?.substring(0, 500) || "", - "db.has_parameters": !!input.parameters, + "db.warehouse_id": body.warehouse_id || "", + "db.catalog": body.catalog ?? "", + "db.schema": body.schema ?? "", + "db.statement": body.statement?.substring(0, 500) || "", + "db.has_parameters": !!body.parameters, }, }, async (span: Span) => { - let abortHandler: (() => void) | undefined; - let isAborted = false; - - if (signal) { - abortHandler = () => { - // abort span if not recording - if (!span.isRecording()) return; - isAborted = true; - span.setAttribute("cancelled", true); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: "Query cancelled by client", - }); - span.end(); - }; - signal.addEventListener("abort", abortHandler, { once: true }); - } - try { - // validate required fields - if (!input.statement) { - throw ValidationError.missingField("statement"); - } - - if (!input.warehouse_id) { - throw ValidationError.missingField("warehouse_id"); - } - - const body: sql.ExecuteStatementRequest = { - statement: input.statement, - parameters: input.parameters, - warehouse_id: input.warehouse_id, - catalog: input.catalog, - schema: input.schema, - wait_timeout: - input.wait_timeout || executeStatementDefaults.wait_timeout, - disposition: - input.disposition || executeStatementDefaults.disposition, - format: input.format || executeStatementDefaults.format, - byte_limit: input.byte_limit, - row_limit: input.row_limit, - on_wait_timeout: - input.on_wait_timeout || executeStatementDefaults.on_wait_timeout, - }; - - span.addEvent("statement.submitting", { - "db.warehouse_id": input.warehouse_id, - }); - const response = await workspaceClient.statementExecution.executeStatement( body, this._createContext(signal), ); - if (!response) { throw ConnectionError.apiFailure("SQL Warehouse"); } - const status = response.status; - const statementId = response.statement_id as string; - - span.setAttribute("db.statement_id", statementId); - span.addEvent("statement.submitted", { - "db.statement_id": response.statement_id, - "db.status": status?.state, - }); - - let result: - | sql.StatementResponse - | { result: { statement_id: string; status: sql.StatementStatus } }; - - switch (status?.state) { - case "RUNNING": - case "PENDING": - span.addEvent("statement.polling_started", { - "db.status": response.status?.state, - }); - result = await this._pollForStatementResult( - workspaceClient, - statementId, - this.config.timeout, - signal, - ); - break; - case "SUCCEEDED": - result = this._transformDataArray(response); - break; - case "FAILED": - throw ExecutionError.statementFailed(status.error?.message); - case "CANCELED": - throw ExecutionError.canceled(); - case "CLOSED": - throw ExecutionError.resultsClosed(); - default: - throw ExecutionError.unknownState( - String(status?.state ?? "unknown"), - ); - } - - const resultData = result.result as any; - const rowCount = - resultData?.data?.length ?? resultData?.data_array?.length ?? 0; - - if (rowCount > 0) { - span.setAttribute("db.result.row_count", rowCount); + if (response.statement_id) { + span.setAttribute("db.statement_id", response.statement_id); } - - const duration = Date.now() - startTime; - logger.event()?.setContext("sql-warehouse", { - warehouse_id: input.warehouse_id, - rows_returned: rowCount, - query_duration_ms: duration, - }); - - success = true; - // only set success status if not aborted - if (!isAborted) { - span.setStatus({ code: SpanStatusCode.OK }); + if (response.status?.state) { + span.setAttribute("db.status", response.status.state); } - return result; + span.setStatus({ code: SpanStatusCode.OK }); + return response; } catch (error) { - // only record error if not already handled by abort - if (!isAborted) { - span.recordException(error as Error); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: error instanceof Error ? error.message : String(error), - }); - - logger.error( - "Statement execution failed: %s", - error instanceof Error ? error.message : String(error), - ); - } - - if (error instanceof AppKitError) { - throw error; - } - throw ExecutionError.statementFailed( - error instanceof Error ? error.message : String(error), - ); + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + throw error; } finally { - // remove abort handler - if (abortHandler && signal) { - signal.removeEventListener("abort", abortHandler); - } - - const duration = Date.now() - startTime; - - // end span if not already ended by abort handler - if (!isAborted) { - span.end(); - } - - const attributes = { - "db.warehouse_id": input.warehouse_id, - "db.catalog": input.catalog ?? "", - "db.schema": input.schema ?? "", - "db.statement": input.statement?.substring(0, 500) || "", - success: success.toString(), - }; - - this.telemetryMetrics.queryCount.add(1, attributes); - this.telemetryMetrics.queryDuration.record(duration, attributes); + span.end(); } }, { name: this.name, includePrefix: true }, ); } - private async _pollForStatementResult( + /** Single status read for a known statement ID. Used to re-attach after a crash. */ + async getStatement( workspaceClient: WorkspaceClient, statementId: string, - timeout = executeStatementDefaults.timeout, signal?: AbortSignal, - ) { + ): Promise { + if (signal?.aborted) { + throw ExecutionError.canceled(); + } + const response = await workspaceClient.statementExecution.getStatement( + { statement_id: statementId }, + this._createContext(signal), + ); + if (!response) { + throw ConnectionError.apiFailure("SQL Warehouse"); + } + return response; + } + + /** + * Block until the statement reaches a terminal state, then transform. + * Public so durable callers can re-attach to a persisted statement ID. + */ + async pollStatement( + workspaceClient: WorkspaceClient, + statementId: string, + signal?: AbortSignal, + timeout = this.config.timeout ?? executeStatementDefaults.timeout, + ): Promise> { return this.telemetry.startActiveSpan( "sql.poll", { @@ -295,15 +228,16 @@ export class SQLWarehouseConnector { async (span: Span) => { try { const startTime = Date.now(); - let delay = 1000; - const maxDelayBetweenPolls = 5000; // max 5 seconds between polls + // 250 ms first poll → doubles → 5 s cap. Short start avoids + // burning a full second on near-instant queries. + let delay = 250; + const maxDelayBetweenPolls = 5000; let pollCount = 0; while (true) { pollCount++; span.setAttribute("db.polling.current_attempt", pollCount); - // check if timeout exceeded const elapsedTime = Date.now() - startTime; if (elapsedTime > timeout) { const error = ExecutionError.statementFailed( @@ -329,9 +263,7 @@ export class SQLWarehouseConnector { const response = await workspaceClient.statementExecution.getStatement( - { - statement_id: statementId, - }, + { statement_id: statementId }, this._createContext(signal), ); if (!response) { @@ -339,7 +271,6 @@ export class SQLWarehouseConnector { } const status = response.status; - span.addEvent("polling.status_check", { "db.status": status?.state, "poll.attempt": pollCount, @@ -348,7 +279,6 @@ export class SQLWarehouseConnector { switch (status?.state) { case "PENDING": case "RUNNING": - // continue polling break; case "SUCCEEDED": span.setAttribute("db.polling.attempts", pollCount); @@ -358,7 +288,7 @@ export class SQLWarehouseConnector { "poll.duration_ms": elapsedTime, }); span.setStatus({ code: SpanStatusCode.OK }); - return this._transformDataArray(response); + return this.transformResult(response); case "FAILED": throw ExecutionError.statementFailed(status.error?.message); case "CANCELED": @@ -371,8 +301,17 @@ export class SQLWarehouseConnector { ); } - // continue polling after delay - await new Promise((resolve) => setTimeout(resolve, delay)); + // ±25% jitter de-syncs concurrent pollers (e.g. post-restart + // reconnect storms). Signal-aware so cancels wake immediately. + const jitterMs = Math.floor(delay * (Math.random() - 0.5) * 0.5); + const sleepMs = Math.max(0, delay + jitterMs); + await sleepWithSignal(sleepMs, signal); + if (signal?.aborted) { + const error = ExecutionError.canceled(); + span.recordException(error); + span.setStatus({ code: SpanStatusCode.ERROR }); + throw error; + } delay = Math.min(delay * 2, maxDelayBetweenPolls); } } catch (error) { @@ -382,7 +321,7 @@ export class SQLWarehouseConnector { message: error instanceof Error ? error.message : String(error), }); - // error logging is handled by executeStatement's catch block (gated on isAborted) + // Logging happens once at the orchestrator (executeStatement / durable handler). if (error instanceof AppKitError) { throw error; } @@ -397,9 +336,16 @@ export class SQLWarehouseConnector { ); } - private _transformDataArray(response: sql.StatementResponse) { + /** + * Standard result transform: ARROW_STREAM → minimal job handle (fetch + * chunks via {@link getArrowData}); JSON with rows → positional + * `data_array` projected into name-keyed `data` (parsing JSON-looking + * STRING values); otherwise pass-through. Public so durable handlers + * can shape a {@link getStatement} response without re-polling. + */ + transformResult(response: sql.StatementResponse) { if (response.manifest?.format === "ARROW_STREAM") { - return this.updateWithArrowStatus(response); + return this._toArrowJobHandle(response); } if (!response.result?.data_array || !response.manifest?.schema?.columns) { @@ -407,14 +353,12 @@ export class SQLWarehouseConnector { } const columns = response.manifest.schema.columns; - const transformedData = response.result.data_array.map((row) => { const obj: Record = {}; row.forEach((value, index) => { const column = columns[index]; const columnName = column?.name || `column_${index}`; - // attempt to parse JSON strings for string columns if ( column?.type_name === "STRING" && typeof value === "string" && @@ -424,7 +368,6 @@ export class SQLWarehouseConnector { try { obj[columnName] = JSON.parse(value); } catch { - // if parsing fails, keep as string obj[columnName] = value; } } else { @@ -434,7 +377,6 @@ export class SQLWarehouseConnector { return obj; }); - // remove data_array const { data_array: _data_array, ...restResult } = response.result; return { ...response, @@ -445,7 +387,165 @@ export class SQLWarehouseConnector { }; } - private updateWithArrowStatus(response: sql.StatementResponse): { + /** + * Submit + (optionally poll) + transform — non-durable happy path. + * Most callers want this; durable executors should call + * {@link submitStatement} and {@link pollStatement} separately so they + * can checkpoint the warehouse-side statement ID between the two. + */ + async executeStatement( + workspaceClient: WorkspaceClient, + input: sql.ExecuteStatementRequest, + signal?: AbortSignal, + ) { + const startTime = Date.now(); + let success = false; + + if (signal?.aborted) { + throw ExecutionError.canceled(); + } + + return this.telemetry.startActiveSpan( + "sql.query", + { + kind: SpanKind.CLIENT, + attributes: { + "db.system": "databricks", + "db.warehouse_id": input.warehouse_id || "", + "db.catalog": input.catalog ?? "", + "db.schema": input.schema ?? "", + "db.statement": input.statement?.substring(0, 500) || "", + "db.has_parameters": !!input.parameters, + }, + }, + async (span: Span) => { + let abortHandler: (() => void) | undefined; + let isAborted = false; + + if (signal) { + abortHandler = () => { + if (!span.isRecording()) return; + isAborted = true; + span.setAttribute("cancelled", true); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: "Query cancelled by client", + }); + span.end(); + }; + signal.addEventListener("abort", abortHandler, { once: true }); + } + + try { + const response = await this.submitStatement( + workspaceClient, + input, + signal, + ); + const status = response.status; + const statementId = response.statement_id as string; + span.setAttribute("db.statement_id", statementId); + + let result: + | sql.StatementResponse + | { result: { statement_id: string; status: sql.StatementStatus } }; + + switch (status?.state) { + case "RUNNING": + case "PENDING": + result = await this.pollStatement( + workspaceClient, + statementId, + signal, + ); + break; + case "SUCCEEDED": + result = this.transformResult(response); + break; + case "FAILED": + throw ExecutionError.statementFailed(status.error?.message); + case "CANCELED": + throw ExecutionError.canceled(); + case "CLOSED": + throw ExecutionError.resultsClosed(); + default: + throw ExecutionError.unknownState( + String(status?.state ?? "unknown"), + ); + } + + const resultData = result.result as any; + const rowCount = + resultData?.data?.length ?? resultData?.data_array?.length ?? 0; + + if (rowCount > 0) { + span.setAttribute("db.result.row_count", rowCount); + } + + const duration = Date.now() - startTime; + logger.event()?.setContext("sql-warehouse", { + warehouse_id: input.warehouse_id, + rows_returned: rowCount, + query_duration_ms: duration, + }); + + success = true; + if (!isAborted) { + span.setStatus({ code: SpanStatusCode.OK }); + } + return result; + } catch (error) { + if (!isAborted) { + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + + logger.error( + "Statement execution failed: %s", + error instanceof Error ? error.message : String(error), + ); + } + + if (error instanceof AppKitError) { + throw error; + } + throw ExecutionError.statementFailed( + error instanceof Error ? error.message : String(error), + ); + } finally { + if (abortHandler && signal) { + signal.removeEventListener("abort", abortHandler); + } + + const duration = Date.now() - startTime; + + if (!isAborted) { + span.end(); + } + + const attributes = { + "db.warehouse_id": input.warehouse_id, + "db.catalog": input.catalog ?? "", + "db.schema": input.schema ?? "", + "db.statement": input.statement?.substring(0, 500) || "", + success: success.toString(), + }; + + this.telemetryMetrics.queryCount.add(1, attributes); + this.telemetryMetrics.queryDuration.record(duration, attributes); + } + }, + { name: this.name, includePrefix: true }, + ); + } + + /** + * Minimal handle the frontend uses to fetch the Arrow buffer via + * `/arrow-result/:jobId`. Drops manifest + external_links by design. + */ + private _toArrowJobHandle(response: sql.StatementResponse): { result: { statement_id: string; status: sql.StatementStatus }; } { return { diff --git a/packages/appkit/src/connectors/tests/sql-warehouse.test.ts b/packages/appkit/src/connectors/tests/sql-warehouse.test.ts index 753d58636..858a95fda 100644 --- a/packages/appkit/src/connectors/tests/sql-warehouse.test.ts +++ b/packages/appkit/src/connectors/tests/sql-warehouse.test.ts @@ -1,7 +1,11 @@ +import { + createFailedSQLResponse, + createSuccessfulSQLResponse, +} from "@tools/test-helpers"; import { beforeEach, describe, expect, test, vi } from "vitest"; import { SQLWarehouseConnector } from "../sql-warehouse"; -// Mock telemetry to pass through span callbacks +// Pass-through telemetry stub: invokes the span callback with a no-op span. vi.mock("../../telemetry", () => { const mockSpan = { end: vi.fn(), @@ -33,32 +37,319 @@ vi.mock("../../telemetry", () => { }; }); +/** Minimal `WorkspaceClient` stub with `executeStatement` / `getStatement` mocks. */ +function makeClient() { + const executeStatement = vi.fn(); + const getStatement = vi.fn(); + return { + client: { + statementExecution: { executeStatement, getStatement }, + config: { host: "https://test.databricks.com" }, + } as any, + mocks: { executeStatement, getStatement }, + }; +} + describe("SQLWarehouseConnector", () => { - describe("error log redaction", () => { - let connector: SQLWarehouseConnector; + let connector: SQLWarehouseConnector; + + beforeEach(() => { + vi.clearAllMocks(); + connector = new SQLWarehouseConnector({ timeout: 5000 }); + }); + + describe("submitStatement", () => { + test("rejects when the statement is missing", async () => { + const { client } = makeClient(); + await expect( + connector.submitStatement(client, { + statement: "", + warehouse_id: "w-1", + }), + ).rejects.toThrow(/statement/); + }); + + test("rejects when the warehouse_id is missing", async () => { + const { client } = makeClient(); + await expect( + connector.submitStatement(client, { + statement: "SELECT 1", + warehouse_id: "", + }), + ).rejects.toThrow(/warehouse_id/); + }); + + test("rejects when the signal is already aborted", async () => { + const { client } = makeClient(); + const ac = new AbortController(); + ac.abort(); + await expect( + connector.submitStatement( + client, + { statement: "SELECT 1", warehouse_id: "w-1" }, + ac.signal, + ), + ).rejects.toThrow(); + }); - beforeEach(() => { - vi.clearAllMocks(); - connector = new SQLWarehouseConnector({ timeout: 5000 }); + test("returns the raw response on success without polling", async () => { + const { client, mocks } = makeClient(); + const response = createSuccessfulSQLResponse([["a"]], [{ name: "col" }]); + mocks.executeStatement.mockResolvedValueOnce(response); + + const result = await connector.submitStatement(client, { + statement: "SELECT 1", + warehouse_id: "w-1", + }); + + expect(result).toBe(response); + expect(mocks.executeStatement).toHaveBeenCalledTimes(1); + expect(mocks.getStatement).not.toHaveBeenCalled(); }); - test("should not log the SQL statement on executeStatement error", async () => { + test("propagates a null response as a SQL Warehouse api failure", async () => { + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValueOnce(null); + + await expect( + connector.submitStatement(client, { + statement: "SELECT 1", + warehouse_id: "w-1", + }), + ).rejects.toThrow(/SQL Warehouse/); + }); + }); + + describe("getStatement", () => { + test("rejects when the signal is already aborted", async () => { + const { client } = makeClient(); + const ac = new AbortController(); + ac.abort(); + await expect( + connector.getStatement(client, "stmt-1", ac.signal), + ).rejects.toThrow(); + }); + + test("returns the raw response", async () => { + const { client, mocks } = makeClient(); + const response = createSuccessfulSQLResponse([["x"]], [{ name: "col" }]); + mocks.getStatement.mockResolvedValueOnce(response); + + const result = await connector.getStatement(client, "stmt-1"); + expect(result).toBe(response); + expect(mocks.getStatement).toHaveBeenCalledWith( + { statement_id: "stmt-1" }, + expect.anything(), + ); + }); + + test("rejects when the response is null", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce(null); + + await expect(connector.getStatement(client, "stmt-1")).rejects.toThrow( + /SQL Warehouse/, + ); + }); + }); + + describe("pollStatement", () => { + test("returns transformed result when status is SUCCEEDED on first poll", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce( + createSuccessfulSQLResponse( + [["alice", "30"]], + [{ name: "name" }, { name: "age" }], + ), + ); + + const result = await connector.pollStatement(client, "stmt-1"); + expect((result as any).result.data).toEqual([ + { name: "alice", age: "30" }, + ]); + }); + + test("throws statementFailed when status is FAILED", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce( + createFailedSQLResponse("Table not found"), + ); + + await expect(connector.pollStatement(client, "stmt-1")).rejects.toThrow( + /Table not found/, + ); + }); + + test("throws canceled when status is CANCELED", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce({ + status: { state: "CANCELED" }, + statement_id: "stmt-1", + }); + + await expect(connector.pollStatement(client, "stmt-1")).rejects.toThrow(); + }); + + test("throws when the polling timeout is exceeded", async () => { + // timeout: 0 trips the elapsed-time check on the second iteration. + const tight = new SQLWarehouseConnector({ timeout: 0 }); + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValue({ + status: { state: "RUNNING" }, + statement_id: "stmt-1", + }); + + await expect( + tight.pollStatement(client, "stmt-1", undefined, 0), + ).rejects.toThrow(/Polling timeout exceeded/); + }); + + test("throws when the signal aborts during polling", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce({ + status: { state: "RUNNING" }, + statement_id: "stmt-1", + }); + + const ac = new AbortController(); + ac.abort(); + + await expect( + connector.pollStatement(client, "stmt-1", ac.signal), + ).rejects.toThrow(); + }); + }); + + describe("transformResult", () => { + test("projects data_array into name-keyed rows", () => { + const response = createSuccessfulSQLResponse( + [ + ["alice", "30"], + ["bob", "25"], + ], + [{ name: "name" }, { name: "age" }], + ); + + const result = connector.transformResult(response as any) as any; + expect(result.result.data).toEqual([ + { name: "alice", age: "30" }, + { name: "bob", age: "25" }, + ]); + expect(result.result.data_array).toBeUndefined(); + }); + + test("parses STRING columns whose value looks like JSON", () => { + const response = createSuccessfulSQLResponse( + [['{"a":1}']], + [{ name: "payload", type_name: "STRING" }], + ); + + const result = connector.transformResult(response as any) as any; + expect(result.result.data[0].payload).toEqual({ a: 1 }); + }); + + test("keeps the raw string when JSON parsing fails", () => { + const response = createSuccessfulSQLResponse( + [["{not-json"]], + [{ name: "payload", type_name: "STRING" }], + ); + + const result = connector.transformResult(response as any) as any; + expect(result.result.data[0].payload).toBe("{not-json"); + }); + + test("returns the Arrow job handle for ARROW_STREAM responses", () => { + const response = { + status: { state: "SUCCEEDED" }, + statement_id: "stmt-arrow-1", + manifest: { format: "ARROW_STREAM" }, + result: { external_links: [] }, + } as any; + + const result = connector.transformResult(response) as any; + expect(result).toEqual({ + result: { + statement_id: "stmt-arrow-1", + status: { state: "SUCCEEDED", error: undefined }, + }, + }); + }); + + test("passes the response through when there is no data_array", () => { + const response = { + status: { state: "SUCCEEDED" }, + statement_id: "stmt-1", + } as any; + + const result = connector.transformResult(response); + expect(result).toBe(response); + }); + }); + + describe("executeStatement", () => { + test("transforms inline when submit returns SUCCEEDED", async () => { + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValueOnce( + createSuccessfulSQLResponse([["a"]], [{ name: "col" }]), + ); + + const result = (await connector.executeStatement(client, { + statement: "SELECT 'a' AS col", + warehouse_id: "w-1", + })) as any; + + expect(result.result.data).toEqual([{ col: "a" }]); + expect(mocks.getStatement).not.toHaveBeenCalled(); + }); + + test("polls when submit returns RUNNING and returns the polled result", async () => { + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValueOnce({ + status: { state: "RUNNING" }, + statement_id: "stmt-2", + }); + mocks.getStatement.mockResolvedValueOnce( + createSuccessfulSQLResponse([["b"]], [{ name: "col" }]), + ); + + const result = (await connector.executeStatement(client, { + statement: "SELECT 'b' AS col", + warehouse_id: "w-1", + })) as any; + + expect(result.result.data).toEqual([{ col: "b" }]); + expect(mocks.executeStatement).toHaveBeenCalledTimes(1); + expect(mocks.getStatement).toHaveBeenCalledTimes(1); + }); + + test("rejects when the signal is already aborted", async () => { + const { client } = makeClient(); + const ac = new AbortController(); + ac.abort(); + await expect( + connector.executeStatement( + client, + { statement: "SELECT 1", warehouse_id: "w-1" }, + ac.signal, + ), + ).rejects.toThrow(); + }); + }); + + describe("error log redaction", () => { + test("does not log the SQL statement on executeStatement error", async () => { const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); const sensitiveStatement = "SELECT password, ssn FROM users WHERE email = 'admin@test.com'"; - const mockWorkspaceClient = { - statementExecution: { - executeStatement: vi - .fn() - .mockRejectedValue(new Error("warehouse unavailable")), - }, - config: { host: "https://test.databricks.com" }, - }; + const { client, mocks } = makeClient(); + mocks.executeStatement.mockRejectedValue( + new Error("warehouse unavailable"), + ); await expect( - connector.executeStatement(mockWorkspaceClient as any, { + connector.executeStatement(client, { statement: sensitiveStatement, warehouse_id: "test-warehouse", }), @@ -68,10 +359,7 @@ describe("SQLWarehouseConnector", () => { .map((call) => call.join(" ")) .join(" "); - // Should log the error message expect(loggedOutput).toContain("warehouse unavailable"); - - // Should NOT log the SQL statement expect(loggedOutput).not.toContain("password"); expect(loggedOutput).not.toContain("ssn"); expect(loggedOutput).not.toContain("admin@test.com"); @@ -79,22 +367,18 @@ describe("SQLWarehouseConnector", () => { errorSpy.mockRestore(); }); - test("should not log the SQL statement on polling error", async () => { + test("does not log the SQL statement on polling error", async () => { const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - const mockWorkspaceClient = { - statementExecution: { - executeStatement: vi.fn().mockResolvedValue({ - statement_id: "stmt-123", - status: { state: "RUNNING" }, - }), - getStatement: vi.fn().mockRejectedValue(new Error("polling timeout")), - }, - config: { host: "https://test.databricks.com" }, - }; + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValue({ + statement_id: "stmt-123", + status: { state: "RUNNING" }, + }); + mocks.getStatement.mockRejectedValue(new Error("polling timeout")); await expect( - connector.executeStatement(mockWorkspaceClient as any, { + connector.executeStatement(client, { statement: "SELECT secret_data FROM vault", warehouse_id: "test-warehouse", }), @@ -104,12 +388,8 @@ describe("SQLWarehouseConnector", () => { .map((call) => call.join(" ")) .join(" "); - // Errors raised inside polling bubble up to executeStatement's catch, - // which is the single point that logs (gated on isAborted). The poll - // layer no longer logs to avoid double-logging the same failure. + // Polling errors bubble to executeStatement's catch — the single point that logs. expect(loggedOutput).toContain("polling timeout"); - - // Should NOT log the SQL statement expect(loggedOutput).not.toContain("secret_data"); expect(loggedOutput).not.toContain("vault");