diff --git a/packages/appkit/src/connectors/sql-warehouse/client.ts b/packages/appkit/src/connectors/sql-warehouse/client.ts index d0a1c1816..78d333d10 100644 --- a/packages/appkit/src/connectors/sql-warehouse/client.ts +++ b/packages/appkit/src/connectors/sql-warehouse/client.ts @@ -25,6 +25,31 @@ import { executeStatementDefaults } from "./defaults"; const logger = createLogger("connectors:sql-warehouse"); +/** + * Sleep for `ms`, resolving early if `signal` aborts. Caller must + * re-check `signal.aborted` afterwards to surface a typed cancel error. + * @internal + */ +function sleepWithSignal(ms: number, signal?: AbortSignal): Promise { + if (ms <= 0) return Promise.resolve(); + return new Promise((resolve) => { + const handle = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }, ms); + const onAbort = () => { + clearTimeout(handle); + resolve(); + }; + if (signal?.aborted) { + clearTimeout(handle); + resolve(); + return; + } + signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + interface SQLWarehouseConfig { timeout?: number; telemetry?: TelemetryOptions; @@ -81,209 +106,117 @@ export class SQLWarehouseConnector { return this._arrowProcessor; } - async executeStatement( + /** + * Submit a statement and return the raw initial response. Caller polls + * via {@link pollStatement} on `RUNNING`/`PENDING`. Split from polling + * so durable executors can checkpoint the warehouse-side statement ID + * and re-attach after a crash without re-running the SQL. + */ + async submitStatement( workspaceClient: WorkspaceClient, input: sql.ExecuteStatementRequest, signal?: AbortSignal, - ) { - const startTime = Date.now(); - let success = false; - - // if signal is aborted, throw an error + ): Promise { if (signal?.aborted) { throw ExecutionError.canceled(); } + if (!input.statement) { + throw ValidationError.missingField("statement"); + } + if (!input.warehouse_id) { + throw ValidationError.missingField("warehouse_id"); + } + + const body: sql.ExecuteStatementRequest = { + statement: input.statement, + parameters: input.parameters, + warehouse_id: input.warehouse_id, + catalog: input.catalog, + schema: input.schema, + wait_timeout: input.wait_timeout || executeStatementDefaults.wait_timeout, + disposition: input.disposition || executeStatementDefaults.disposition, + format: input.format || executeStatementDefaults.format, + byte_limit: input.byte_limit, + row_limit: input.row_limit, + on_wait_timeout: + input.on_wait_timeout || executeStatementDefaults.on_wait_timeout, + }; return this.telemetry.startActiveSpan( - "sql.query", + "sql.submit", { kind: SpanKind.CLIENT, attributes: { "db.system": "databricks", - "db.warehouse_id": input.warehouse_id || "", - "db.catalog": input.catalog ?? "", - "db.schema": input.schema ?? "", - "db.statement": input.statement?.substring(0, 500) || "", - "db.has_parameters": !!input.parameters, + "db.warehouse_id": body.warehouse_id || "", + "db.catalog": body.catalog ?? "", + "db.schema": body.schema ?? "", + "db.statement": body.statement?.substring(0, 500) || "", + "db.has_parameters": !!body.parameters, }, }, async (span: Span) => { - let abortHandler: (() => void) | undefined; - let isAborted = false; - - if (signal) { - abortHandler = () => { - // abort span if not recording - if (!span.isRecording()) return; - isAborted = true; - span.setAttribute("cancelled", true); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: "Query cancelled by client", - }); - span.end(); - }; - signal.addEventListener("abort", abortHandler, { once: true }); - } - try { - // validate required fields - if (!input.statement) { - throw ValidationError.missingField("statement"); - } - - if (!input.warehouse_id) { - throw ValidationError.missingField("warehouse_id"); - } - - const body: sql.ExecuteStatementRequest = { - statement: input.statement, - parameters: input.parameters, - warehouse_id: input.warehouse_id, - catalog: input.catalog, - schema: input.schema, - wait_timeout: - input.wait_timeout || executeStatementDefaults.wait_timeout, - disposition: - input.disposition || executeStatementDefaults.disposition, - format: input.format || executeStatementDefaults.format, - byte_limit: input.byte_limit, - row_limit: input.row_limit, - on_wait_timeout: - input.on_wait_timeout || executeStatementDefaults.on_wait_timeout, - }; - - span.addEvent("statement.submitting", { - "db.warehouse_id": input.warehouse_id, - }); - const response = await workspaceClient.statementExecution.executeStatement( body, this._createContext(signal), ); - if (!response) { throw ConnectionError.apiFailure("SQL Warehouse"); } - const status = response.status; - const statementId = response.statement_id as string; - - span.setAttribute("db.statement_id", statementId); - span.addEvent("statement.submitted", { - "db.statement_id": response.statement_id, - "db.status": status?.state, - }); - - let result: - | sql.StatementResponse - | { result: { statement_id: string; status: sql.StatementStatus } }; - - switch (status?.state) { - case "RUNNING": - case "PENDING": - span.addEvent("statement.polling_started", { - "db.status": response.status?.state, - }); - result = await this._pollForStatementResult( - workspaceClient, - statementId, - this.config.timeout, - signal, - ); - break; - case "SUCCEEDED": - result = this._transformDataArray(response); - break; - case "FAILED": - throw ExecutionError.statementFailed(status.error?.message); - case "CANCELED": - throw ExecutionError.canceled(); - case "CLOSED": - throw ExecutionError.resultsClosed(); - default: - throw ExecutionError.unknownState( - String(status?.state ?? "unknown"), - ); - } - - const resultData = result.result as any; - const rowCount = - resultData?.data?.length ?? resultData?.data_array?.length ?? 0; - - if (rowCount > 0) { - span.setAttribute("db.result.row_count", rowCount); + if (response.statement_id) { + span.setAttribute("db.statement_id", response.statement_id); } - - const duration = Date.now() - startTime; - logger.event()?.setContext("sql-warehouse", { - warehouse_id: input.warehouse_id, - rows_returned: rowCount, - query_duration_ms: duration, - }); - - success = true; - // only set success status if not aborted - if (!isAborted) { - span.setStatus({ code: SpanStatusCode.OK }); + if (response.status?.state) { + span.setAttribute("db.status", response.status.state); } - return result; + span.setStatus({ code: SpanStatusCode.OK }); + return response; } catch (error) { - // only record error if not already handled by abort - if (!isAborted) { - span.recordException(error as Error); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: error instanceof Error ? error.message : String(error), - }); - - logger.error( - "Statement execution failed: %s", - error instanceof Error ? error.message : String(error), - ); - } - - if (error instanceof AppKitError) { - throw error; - } - throw ExecutionError.statementFailed( - error instanceof Error ? error.message : String(error), - ); + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + throw error; } finally { - // remove abort handler - if (abortHandler && signal) { - signal.removeEventListener("abort", abortHandler); - } - - const duration = Date.now() - startTime; - - // end span if not already ended by abort handler - if (!isAborted) { - span.end(); - } - - const attributes = { - "db.warehouse_id": input.warehouse_id, - "db.catalog": input.catalog ?? "", - "db.schema": input.schema ?? "", - "db.statement": input.statement?.substring(0, 500) || "", - success: success.toString(), - }; - - this.telemetryMetrics.queryCount.add(1, attributes); - this.telemetryMetrics.queryDuration.record(duration, attributes); + span.end(); } }, { name: this.name, includePrefix: true }, ); } - private async _pollForStatementResult( + /** Single status read for a known statement ID. Used to re-attach after a crash. */ + async getStatement( workspaceClient: WorkspaceClient, statementId: string, - timeout = executeStatementDefaults.timeout, signal?: AbortSignal, - ) { + ): Promise { + if (signal?.aborted) { + throw ExecutionError.canceled(); + } + const response = await workspaceClient.statementExecution.getStatement( + { statement_id: statementId }, + this._createContext(signal), + ); + if (!response) { + throw ConnectionError.apiFailure("SQL Warehouse"); + } + return response; + } + + /** + * Block until the statement reaches a terminal state, then transform. + * Public so durable callers can re-attach to a persisted statement ID. + */ + async pollStatement( + workspaceClient: WorkspaceClient, + statementId: string, + signal?: AbortSignal, + timeout = this.config.timeout ?? executeStatementDefaults.timeout, + ): Promise> { return this.telemetry.startActiveSpan( "sql.poll", { @@ -295,15 +228,16 @@ export class SQLWarehouseConnector { async (span: Span) => { try { const startTime = Date.now(); - let delay = 1000; - const maxDelayBetweenPolls = 5000; // max 5 seconds between polls + // 250 ms first poll → doubles → 5 s cap. Short start avoids + // burning a full second on near-instant queries. + let delay = 250; + const maxDelayBetweenPolls = 5000; let pollCount = 0; while (true) { pollCount++; span.setAttribute("db.polling.current_attempt", pollCount); - // check if timeout exceeded const elapsedTime = Date.now() - startTime; if (elapsedTime > timeout) { const error = ExecutionError.statementFailed( @@ -329,9 +263,7 @@ export class SQLWarehouseConnector { const response = await workspaceClient.statementExecution.getStatement( - { - statement_id: statementId, - }, + { statement_id: statementId }, this._createContext(signal), ); if (!response) { @@ -339,7 +271,6 @@ export class SQLWarehouseConnector { } const status = response.status; - span.addEvent("polling.status_check", { "db.status": status?.state, "poll.attempt": pollCount, @@ -348,7 +279,6 @@ export class SQLWarehouseConnector { switch (status?.state) { case "PENDING": case "RUNNING": - // continue polling break; case "SUCCEEDED": span.setAttribute("db.polling.attempts", pollCount); @@ -358,7 +288,7 @@ export class SQLWarehouseConnector { "poll.duration_ms": elapsedTime, }); span.setStatus({ code: SpanStatusCode.OK }); - return this._transformDataArray(response); + return this.transformResult(response); case "FAILED": throw ExecutionError.statementFailed(status.error?.message); case "CANCELED": @@ -371,8 +301,17 @@ export class SQLWarehouseConnector { ); } - // continue polling after delay - await new Promise((resolve) => setTimeout(resolve, delay)); + // ±25% jitter de-syncs concurrent pollers (e.g. post-restart + // reconnect storms). Signal-aware so cancels wake immediately. + const jitterMs = Math.floor(delay * (Math.random() - 0.5) * 0.5); + const sleepMs = Math.max(0, delay + jitterMs); + await sleepWithSignal(sleepMs, signal); + if (signal?.aborted) { + const error = ExecutionError.canceled(); + span.recordException(error); + span.setStatus({ code: SpanStatusCode.ERROR }); + throw error; + } delay = Math.min(delay * 2, maxDelayBetweenPolls); } } catch (error) { @@ -382,7 +321,7 @@ export class SQLWarehouseConnector { message: error instanceof Error ? error.message : String(error), }); - // error logging is handled by executeStatement's catch block (gated on isAborted) + // Logging happens once at the orchestrator (executeStatement / durable handler). if (error instanceof AppKitError) { throw error; } @@ -397,9 +336,16 @@ export class SQLWarehouseConnector { ); } - private _transformDataArray(response: sql.StatementResponse) { + /** + * Standard result transform: ARROW_STREAM → minimal job handle (fetch + * chunks via {@link getArrowData}); JSON with rows → positional + * `data_array` projected into name-keyed `data` (parsing JSON-looking + * STRING values); otherwise pass-through. Public so durable handlers + * can shape a {@link getStatement} response without re-polling. + */ + transformResult(response: sql.StatementResponse) { if (response.manifest?.format === "ARROW_STREAM") { - return this.updateWithArrowStatus(response); + return this._toArrowJobHandle(response); } if (!response.result?.data_array || !response.manifest?.schema?.columns) { @@ -407,14 +353,12 @@ export class SQLWarehouseConnector { } const columns = response.manifest.schema.columns; - const transformedData = response.result.data_array.map((row) => { const obj: Record = {}; row.forEach((value, index) => { const column = columns[index]; const columnName = column?.name || `column_${index}`; - // attempt to parse JSON strings for string columns if ( column?.type_name === "STRING" && typeof value === "string" && @@ -424,7 +368,6 @@ export class SQLWarehouseConnector { try { obj[columnName] = JSON.parse(value); } catch { - // if parsing fails, keep as string obj[columnName] = value; } } else { @@ -434,7 +377,6 @@ export class SQLWarehouseConnector { return obj; }); - // remove data_array const { data_array: _data_array, ...restResult } = response.result; return { ...response, @@ -445,7 +387,165 @@ export class SQLWarehouseConnector { }; } - private updateWithArrowStatus(response: sql.StatementResponse): { + /** + * Submit + (optionally poll) + transform — non-durable happy path. + * Most callers want this; durable executors should call + * {@link submitStatement} and {@link pollStatement} separately so they + * can checkpoint the warehouse-side statement ID between the two. + */ + async executeStatement( + workspaceClient: WorkspaceClient, + input: sql.ExecuteStatementRequest, + signal?: AbortSignal, + ) { + const startTime = Date.now(); + let success = false; + + if (signal?.aborted) { + throw ExecutionError.canceled(); + } + + return this.telemetry.startActiveSpan( + "sql.query", + { + kind: SpanKind.CLIENT, + attributes: { + "db.system": "databricks", + "db.warehouse_id": input.warehouse_id || "", + "db.catalog": input.catalog ?? "", + "db.schema": input.schema ?? "", + "db.statement": input.statement?.substring(0, 500) || "", + "db.has_parameters": !!input.parameters, + }, + }, + async (span: Span) => { + let abortHandler: (() => void) | undefined; + let isAborted = false; + + if (signal) { + abortHandler = () => { + if (!span.isRecording()) return; + isAborted = true; + span.setAttribute("cancelled", true); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: "Query cancelled by client", + }); + span.end(); + }; + signal.addEventListener("abort", abortHandler, { once: true }); + } + + try { + const response = await this.submitStatement( + workspaceClient, + input, + signal, + ); + const status = response.status; + const statementId = response.statement_id as string; + span.setAttribute("db.statement_id", statementId); + + let result: + | sql.StatementResponse + | { result: { statement_id: string; status: sql.StatementStatus } }; + + switch (status?.state) { + case "RUNNING": + case "PENDING": + result = await this.pollStatement( + workspaceClient, + statementId, + signal, + ); + break; + case "SUCCEEDED": + result = this.transformResult(response); + break; + case "FAILED": + throw ExecutionError.statementFailed(status.error?.message); + case "CANCELED": + throw ExecutionError.canceled(); + case "CLOSED": + throw ExecutionError.resultsClosed(); + default: + throw ExecutionError.unknownState( + String(status?.state ?? "unknown"), + ); + } + + const resultData = result.result as any; + const rowCount = + resultData?.data?.length ?? resultData?.data_array?.length ?? 0; + + if (rowCount > 0) { + span.setAttribute("db.result.row_count", rowCount); + } + + const duration = Date.now() - startTime; + logger.event()?.setContext("sql-warehouse", { + warehouse_id: input.warehouse_id, + rows_returned: rowCount, + query_duration_ms: duration, + }); + + success = true; + if (!isAborted) { + span.setStatus({ code: SpanStatusCode.OK }); + } + return result; + } catch (error) { + if (!isAborted) { + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + + logger.error( + "Statement execution failed: %s", + error instanceof Error ? error.message : String(error), + ); + } + + if (error instanceof AppKitError) { + throw error; + } + throw ExecutionError.statementFailed( + error instanceof Error ? error.message : String(error), + ); + } finally { + if (abortHandler && signal) { + signal.removeEventListener("abort", abortHandler); + } + + const duration = Date.now() - startTime; + + if (!isAborted) { + span.end(); + } + + const attributes = { + "db.warehouse_id": input.warehouse_id, + "db.catalog": input.catalog ?? "", + "db.schema": input.schema ?? "", + "db.statement": input.statement?.substring(0, 500) || "", + success: success.toString(), + }; + + this.telemetryMetrics.queryCount.add(1, attributes); + this.telemetryMetrics.queryDuration.record(duration, attributes); + } + }, + { name: this.name, includePrefix: true }, + ); + } + + /** + * Minimal handle the frontend uses to fetch the Arrow buffer via + * `/arrow-result/:jobId`. Drops manifest + external_links by design. + */ + private _toArrowJobHandle(response: sql.StatementResponse): { result: { statement_id: string; status: sql.StatementStatus }; } { return { diff --git a/packages/appkit/src/connectors/tests/sql-warehouse.test.ts b/packages/appkit/src/connectors/tests/sql-warehouse.test.ts index 753d58636..858a95fda 100644 --- a/packages/appkit/src/connectors/tests/sql-warehouse.test.ts +++ b/packages/appkit/src/connectors/tests/sql-warehouse.test.ts @@ -1,7 +1,11 @@ +import { + createFailedSQLResponse, + createSuccessfulSQLResponse, +} from "@tools/test-helpers"; import { beforeEach, describe, expect, test, vi } from "vitest"; import { SQLWarehouseConnector } from "../sql-warehouse"; -// Mock telemetry to pass through span callbacks +// Pass-through telemetry stub: invokes the span callback with a no-op span. vi.mock("../../telemetry", () => { const mockSpan = { end: vi.fn(), @@ -33,32 +37,319 @@ vi.mock("../../telemetry", () => { }; }); +/** Minimal `WorkspaceClient` stub with `executeStatement` / `getStatement` mocks. */ +function makeClient() { + const executeStatement = vi.fn(); + const getStatement = vi.fn(); + return { + client: { + statementExecution: { executeStatement, getStatement }, + config: { host: "https://test.databricks.com" }, + } as any, + mocks: { executeStatement, getStatement }, + }; +} + describe("SQLWarehouseConnector", () => { - describe("error log redaction", () => { - let connector: SQLWarehouseConnector; + let connector: SQLWarehouseConnector; + + beforeEach(() => { + vi.clearAllMocks(); + connector = new SQLWarehouseConnector({ timeout: 5000 }); + }); + + describe("submitStatement", () => { + test("rejects when the statement is missing", async () => { + const { client } = makeClient(); + await expect( + connector.submitStatement(client, { + statement: "", + warehouse_id: "w-1", + }), + ).rejects.toThrow(/statement/); + }); + + test("rejects when the warehouse_id is missing", async () => { + const { client } = makeClient(); + await expect( + connector.submitStatement(client, { + statement: "SELECT 1", + warehouse_id: "", + }), + ).rejects.toThrow(/warehouse_id/); + }); + + test("rejects when the signal is already aborted", async () => { + const { client } = makeClient(); + const ac = new AbortController(); + ac.abort(); + await expect( + connector.submitStatement( + client, + { statement: "SELECT 1", warehouse_id: "w-1" }, + ac.signal, + ), + ).rejects.toThrow(); + }); - beforeEach(() => { - vi.clearAllMocks(); - connector = new SQLWarehouseConnector({ timeout: 5000 }); + test("returns the raw response on success without polling", async () => { + const { client, mocks } = makeClient(); + const response = createSuccessfulSQLResponse([["a"]], [{ name: "col" }]); + mocks.executeStatement.mockResolvedValueOnce(response); + + const result = await connector.submitStatement(client, { + statement: "SELECT 1", + warehouse_id: "w-1", + }); + + expect(result).toBe(response); + expect(mocks.executeStatement).toHaveBeenCalledTimes(1); + expect(mocks.getStatement).not.toHaveBeenCalled(); }); - test("should not log the SQL statement on executeStatement error", async () => { + test("propagates a null response as a SQL Warehouse api failure", async () => { + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValueOnce(null); + + await expect( + connector.submitStatement(client, { + statement: "SELECT 1", + warehouse_id: "w-1", + }), + ).rejects.toThrow(/SQL Warehouse/); + }); + }); + + describe("getStatement", () => { + test("rejects when the signal is already aborted", async () => { + const { client } = makeClient(); + const ac = new AbortController(); + ac.abort(); + await expect( + connector.getStatement(client, "stmt-1", ac.signal), + ).rejects.toThrow(); + }); + + test("returns the raw response", async () => { + const { client, mocks } = makeClient(); + const response = createSuccessfulSQLResponse([["x"]], [{ name: "col" }]); + mocks.getStatement.mockResolvedValueOnce(response); + + const result = await connector.getStatement(client, "stmt-1"); + expect(result).toBe(response); + expect(mocks.getStatement).toHaveBeenCalledWith( + { statement_id: "stmt-1" }, + expect.anything(), + ); + }); + + test("rejects when the response is null", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce(null); + + await expect(connector.getStatement(client, "stmt-1")).rejects.toThrow( + /SQL Warehouse/, + ); + }); + }); + + describe("pollStatement", () => { + test("returns transformed result when status is SUCCEEDED on first poll", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce( + createSuccessfulSQLResponse( + [["alice", "30"]], + [{ name: "name" }, { name: "age" }], + ), + ); + + const result = await connector.pollStatement(client, "stmt-1"); + expect((result as any).result.data).toEqual([ + { name: "alice", age: "30" }, + ]); + }); + + test("throws statementFailed when status is FAILED", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce( + createFailedSQLResponse("Table not found"), + ); + + await expect(connector.pollStatement(client, "stmt-1")).rejects.toThrow( + /Table not found/, + ); + }); + + test("throws canceled when status is CANCELED", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce({ + status: { state: "CANCELED" }, + statement_id: "stmt-1", + }); + + await expect(connector.pollStatement(client, "stmt-1")).rejects.toThrow(); + }); + + test("throws when the polling timeout is exceeded", async () => { + // timeout: 0 trips the elapsed-time check on the second iteration. + const tight = new SQLWarehouseConnector({ timeout: 0 }); + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValue({ + status: { state: "RUNNING" }, + statement_id: "stmt-1", + }); + + await expect( + tight.pollStatement(client, "stmt-1", undefined, 0), + ).rejects.toThrow(/Polling timeout exceeded/); + }); + + test("throws when the signal aborts during polling", async () => { + const { client, mocks } = makeClient(); + mocks.getStatement.mockResolvedValueOnce({ + status: { state: "RUNNING" }, + statement_id: "stmt-1", + }); + + const ac = new AbortController(); + ac.abort(); + + await expect( + connector.pollStatement(client, "stmt-1", ac.signal), + ).rejects.toThrow(); + }); + }); + + describe("transformResult", () => { + test("projects data_array into name-keyed rows", () => { + const response = createSuccessfulSQLResponse( + [ + ["alice", "30"], + ["bob", "25"], + ], + [{ name: "name" }, { name: "age" }], + ); + + const result = connector.transformResult(response as any) as any; + expect(result.result.data).toEqual([ + { name: "alice", age: "30" }, + { name: "bob", age: "25" }, + ]); + expect(result.result.data_array).toBeUndefined(); + }); + + test("parses STRING columns whose value looks like JSON", () => { + const response = createSuccessfulSQLResponse( + [['{"a":1}']], + [{ name: "payload", type_name: "STRING" }], + ); + + const result = connector.transformResult(response as any) as any; + expect(result.result.data[0].payload).toEqual({ a: 1 }); + }); + + test("keeps the raw string when JSON parsing fails", () => { + const response = createSuccessfulSQLResponse( + [["{not-json"]], + [{ name: "payload", type_name: "STRING" }], + ); + + const result = connector.transformResult(response as any) as any; + expect(result.result.data[0].payload).toBe("{not-json"); + }); + + test("returns the Arrow job handle for ARROW_STREAM responses", () => { + const response = { + status: { state: "SUCCEEDED" }, + statement_id: "stmt-arrow-1", + manifest: { format: "ARROW_STREAM" }, + result: { external_links: [] }, + } as any; + + const result = connector.transformResult(response) as any; + expect(result).toEqual({ + result: { + statement_id: "stmt-arrow-1", + status: { state: "SUCCEEDED", error: undefined }, + }, + }); + }); + + test("passes the response through when there is no data_array", () => { + const response = { + status: { state: "SUCCEEDED" }, + statement_id: "stmt-1", + } as any; + + const result = connector.transformResult(response); + expect(result).toBe(response); + }); + }); + + describe("executeStatement", () => { + test("transforms inline when submit returns SUCCEEDED", async () => { + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValueOnce( + createSuccessfulSQLResponse([["a"]], [{ name: "col" }]), + ); + + const result = (await connector.executeStatement(client, { + statement: "SELECT 'a' AS col", + warehouse_id: "w-1", + })) as any; + + expect(result.result.data).toEqual([{ col: "a" }]); + expect(mocks.getStatement).not.toHaveBeenCalled(); + }); + + test("polls when submit returns RUNNING and returns the polled result", async () => { + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValueOnce({ + status: { state: "RUNNING" }, + statement_id: "stmt-2", + }); + mocks.getStatement.mockResolvedValueOnce( + createSuccessfulSQLResponse([["b"]], [{ name: "col" }]), + ); + + const result = (await connector.executeStatement(client, { + statement: "SELECT 'b' AS col", + warehouse_id: "w-1", + })) as any; + + expect(result.result.data).toEqual([{ col: "b" }]); + expect(mocks.executeStatement).toHaveBeenCalledTimes(1); + expect(mocks.getStatement).toHaveBeenCalledTimes(1); + }); + + test("rejects when the signal is already aborted", async () => { + const { client } = makeClient(); + const ac = new AbortController(); + ac.abort(); + await expect( + connector.executeStatement( + client, + { statement: "SELECT 1", warehouse_id: "w-1" }, + ac.signal, + ), + ).rejects.toThrow(); + }); + }); + + describe("error log redaction", () => { + test("does not log the SQL statement on executeStatement error", async () => { const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); const sensitiveStatement = "SELECT password, ssn FROM users WHERE email = 'admin@test.com'"; - const mockWorkspaceClient = { - statementExecution: { - executeStatement: vi - .fn() - .mockRejectedValue(new Error("warehouse unavailable")), - }, - config: { host: "https://test.databricks.com" }, - }; + const { client, mocks } = makeClient(); + mocks.executeStatement.mockRejectedValue( + new Error("warehouse unavailable"), + ); await expect( - connector.executeStatement(mockWorkspaceClient as any, { + connector.executeStatement(client, { statement: sensitiveStatement, warehouse_id: "test-warehouse", }), @@ -68,10 +359,7 @@ describe("SQLWarehouseConnector", () => { .map((call) => call.join(" ")) .join(" "); - // Should log the error message expect(loggedOutput).toContain("warehouse unavailable"); - - // Should NOT log the SQL statement expect(loggedOutput).not.toContain("password"); expect(loggedOutput).not.toContain("ssn"); expect(loggedOutput).not.toContain("admin@test.com"); @@ -79,22 +367,18 @@ describe("SQLWarehouseConnector", () => { errorSpy.mockRestore(); }); - test("should not log the SQL statement on polling error", async () => { + test("does not log the SQL statement on polling error", async () => { const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - const mockWorkspaceClient = { - statementExecution: { - executeStatement: vi.fn().mockResolvedValue({ - statement_id: "stmt-123", - status: { state: "RUNNING" }, - }), - getStatement: vi.fn().mockRejectedValue(new Error("polling timeout")), - }, - config: { host: "https://test.databricks.com" }, - }; + const { client, mocks } = makeClient(); + mocks.executeStatement.mockResolvedValue({ + statement_id: "stmt-123", + status: { state: "RUNNING" }, + }); + mocks.getStatement.mockRejectedValue(new Error("polling timeout")); await expect( - connector.executeStatement(mockWorkspaceClient as any, { + connector.executeStatement(client, { statement: "SELECT secret_data FROM vault", warehouse_id: "test-warehouse", }), @@ -104,12 +388,8 @@ describe("SQLWarehouseConnector", () => { .map((call) => call.join(" ")) .join(" "); - // Errors raised inside polling bubble up to executeStatement's catch, - // which is the single point that logs (gated on isAborted). The poll - // layer no longer logs to avoid double-logging the same failure. + // Polling errors bubble to executeStatement's catch — the single point that logs. expect(loggedOutput).toContain("polling timeout"); - - // Should NOT log the SQL statement expect(loggedOutput).not.toContain("secret_data"); expect(loggedOutput).not.toContain("vault");