From 625eef19a7ab6337b91140fa03c3481c88077f2f Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Mon, 4 May 2026 16:27:00 +0200 Subject: [PATCH 01/13] feat(lakebase): add OBO pool manager for per-user Lakebase connections Add LakebasePoolManager to the AppKit connector layer, enabling On-Behalf-Of (OBO) authentication where each user gets their own pg.Pool with individual OAuth token refresh. This enables Row-Level Security (RLS) in Lakebase based on current_user identity. Changes: - Add createLakebasePoolManager() in appkit connector (pool-manager.ts) - Export LakebasePoolManager type and factory from @databricks/appkit - Update dev-playground raw driver example with RLS policy and OBO routes - GET/POST /raw/my-products use per-user pools (RLS enforced) - GET/POST /raw/products remain on SP pool (RLS bypassed) Signed-off-by: Pawel Kosiec --- .../lakebase-examples/raw-driver-example.ts | 123 +++++++++++++++++- .../appkit/src/connectors/lakebase/index.ts | 5 + .../src/connectors/lakebase/pool-manager.ts | 80 ++++++++++++ packages/appkit/src/index.ts | 2 + 4 files changed, 203 insertions(+), 7 deletions(-) create mode 100644 packages/appkit/src/connectors/lakebase/pool-manager.ts diff --git a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts index 43b2ca3b2..dc731b24e 100644 --- a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts +++ b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts @@ -1,8 +1,14 @@ -import { createLakebasePool } from "@databricks/appkit"; +import { + createLakebasePool, + createLakebasePoolManager, + type LakebasePoolManager, +} from "@databricks/appkit"; +import { WorkspaceClient } from "@databricks/sdk-experimental"; import type { Pool } from "pg"; import type { IAppRouter } from "shared"; let pool: Pool; +let oboPoolManager: LakebasePoolManager; /** * Raw PostgreSQL driver example using pg.Pool with automatic OAuth token refresh. @@ -13,6 +19,8 @@ let pool: Pool; * - Schema and table creation (idempotent) * - Basic CRUD operations * - Connection health checking + * - On-Behalf-Of (OBO) authentication with per-user pools + * - Row-Level Security (RLS) enforcement via OBO */ interface Product { @@ -21,14 +29,17 @@ interface Product { category: string; price: number; stock: number; - created_by?: string; + created_by: string | null; created_at: Date; } export async function setup(user?: string) { - // Create pool with automatic OAuth token refresh + // Create service principal pool with automatic OAuth token refresh pool = createLakebasePool({ user }); + // Create OBO pool manager for per-user pools + oboPoolManager = createLakebasePoolManager(); + // Create schema and table (idempotent) await pool.query(` CREATE SCHEMA IF NOT EXISTS raw_example; @@ -39,10 +50,36 @@ export async function setup(user?: string) { category VARCHAR(100), price DECIMAL(10, 2), stock INTEGER DEFAULT 0, + created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); `); + // Enable Row-Level Security (idempotent) + await pool.query(` + ALTER TABLE raw_example.products ENABLE ROW LEVEL SECURITY; + `); + + // Create RLS policy (idempotent via IF NOT EXISTS-like pattern) + // Users see only rows they created (or rows with NULL created_by for seed data). + // The table owner (service principal) bypasses RLS automatically. + await pool.query(` + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_policies + WHERE schemaname = 'raw_example' + AND tablename = 'products' + AND policyname = 'user_products_policy' + ) THEN + CREATE POLICY user_products_policy ON raw_example.products + FOR ALL TO PUBLIC + USING (created_by = current_user OR created_by IS NULL); + END IF; + END + $$; + `); + // Seed sample data if table is empty const { rows } = await pool.query<{ count: string }>( "SELECT COUNT(*) as count FROM raw_example.products", @@ -52,8 +89,37 @@ export async function setup(user?: string) { } } +/** + * Get a per-user pool from the OBO pool manager. + * Falls back to the service principal pool in development when no user token is available. + */ +function getUserPool( + req: { header(name: string): string | undefined }, + fallbackPool: Pool, +): { pool: Pool; userName: string | null } { + const userToken = req.header("x-forwarded-access-token"); + const userName = req.header("x-forwarded-user"); + + if (!userToken || !userName) { + return { pool: fallbackPool, userName: null }; + } + + const userPool = oboPoolManager.getPool(userName, { + workspaceClient: new WorkspaceClient({ + token: userToken, + host: process.env.DATABRICKS_HOST, + authType: "pat", + }), + user: userName, + }); + + return { pool: userPool, userName }; +} + export function registerRoutes(router: IAppRouter, basePath: string) { - // GET /api/lakebase-examples/raw/products - List all products + // ── Service principal routes (bypass RLS as table owner) ────────── + + // GET /raw/products - List ALL products (SP pool, bypasses RLS) router.get(`${basePath}/products`, async (_req, res) => { try { const result = await pool.query( @@ -69,7 +135,7 @@ export function registerRoutes(router: IAppRouter, basePath: string) { } }); - // POST /api/lakebase-examples/raw/products - Create new product + // POST /raw/products - Create product as SP (no created_by) router.post(`${basePath}/products`, async (req, res) => { try { const { name, category, price, stock } = req.body; @@ -89,7 +155,7 @@ export function registerRoutes(router: IAppRouter, basePath: string) { } }); - // GET /api/lakebase-examples/raw/health - Connection health check + // GET /raw/health - Connection health check router.get(`${basePath}/health`, async (_req, res) => { try { await pool.query("SELECT 1"); @@ -107,10 +173,53 @@ export function registerRoutes(router: IAppRouter, basePath: string) { }); } }); + + // ── OBO routes (per-user pool, RLS enforced) ───────────────────── + + // GET /raw/my-products - List products visible to the current user (RLS-filtered) + router.get(`${basePath}/my-products`, async (req, res) => { + try { + const { pool: userPool, userName } = getUserPool(req, pool); + const result = await userPool.query( + "SELECT * FROM raw_example.products ORDER BY id", + ); + res.json({ + user: userName ?? "service-principal (dev fallback)", + products: result.rows, + }); + } catch (error: unknown) { + const err = error as Error; + res.status(500).json({ + error: "Failed to fetch user products", + message: err.message, + }); + } + }); + + // POST /raw/my-products - Create product as current user (sets created_by) + router.post(`${basePath}/my-products`, async (req, res) => { + try { + const { pool: userPool, userName } = getUserPool(req, pool); + const { name, category, price, stock } = req.body; + + const result = await userPool.query( + `INSERT INTO raw_example.products (name, category, price, stock, created_by) + VALUES ($1, $2, $3, $4, $5) RETURNING *`, + [name, category, Number(price), Number(stock), userName], + ); + res.json(result.rows[0]); + } catch (error: unknown) { + const err = error as Error; + res.status(500).json({ + error: "Failed to create product", + message: err.message, + }); + } + }); } export async function cleanup() { - await pool.end(); + await Promise.all([pool.end(), oboPoolManager.closeAll()]); } async function seedProducts(pool: Pool) { diff --git a/packages/appkit/src/connectors/lakebase/index.ts b/packages/appkit/src/connectors/lakebase/index.ts index c58b7a8cb..e2798a21e 100644 --- a/packages/appkit/src/connectors/lakebase/index.ts +++ b/packages/appkit/src/connectors/lakebase/index.ts @@ -35,3 +35,8 @@ export { RequestedClaimsPermissionSet, type RequestedResource, } from "@databricks/lakebase"; + +export { + createLakebasePoolManager, + type LakebasePoolManager, +} from "./pool-manager"; diff --git a/packages/appkit/src/connectors/lakebase/pool-manager.ts b/packages/appkit/src/connectors/lakebase/pool-manager.ts new file mode 100644 index 000000000..8aa4bd93a --- /dev/null +++ b/packages/appkit/src/connectors/lakebase/pool-manager.ts @@ -0,0 +1,80 @@ +import type { LakebasePoolConfig } from "@databricks/lakebase"; +import type { Pool } from "pg"; +import { createLakebasePool } from "./index"; + +/** + * Manages multiple Lakebase connection pools keyed by an identifier (e.g. userId). + * + * Used for On-Behalf-Of (OBO) scenarios where each user needs their own pool + * with their own OAuth token refresh, enabling features like Row-Level Security. + */ +export interface LakebasePoolManager { + /** + * Get an existing pool or create a new one for the given key. + * When creating, merges `perPoolConfig` with the base config passed to the factory. + * On subsequent calls with the same key, `perPoolConfig` is ignored and the cached pool is returned. + */ + getPool(key: string, perPoolConfig: Partial): Pool; + + /** Close and remove a specific pool. */ + closePool(key: string): Promise; + + /** Close all managed pools (for graceful shutdown). */ + closeAll(): Promise; + + /** Number of active pools. */ + readonly size: number; +} + +/** + * Create a pool manager that maintains per-key Lakebase connection pools. + * + * Each pool is created via `createLakebasePool` with the base config merged + * with per-pool overrides (e.g. a user's `workspaceClient` and `user`). + * + * @example OBO usage + * ```typescript + * const poolManager = createLakebasePoolManager(); + * + * // In a route handler: + * const userPool = poolManager.getPool(userName, { + * workspaceClient: new WorkspaceClient({ token: userToken, host, authType: "pat" }), + * user: userName, + * }); + * const result = await userPool.query("SELECT * FROM products"); + * ``` + */ +export function createLakebasePoolManager( + baseConfig?: Partial, +): LakebasePoolManager { + const pools = new Map(); + + return { + getPool(key: string, perPoolConfig: Partial): Pool { + const existing = pools.get(key); + if (existing) return existing; + + const pool = createLakebasePool({ ...baseConfig, ...perPoolConfig }); + pools.set(key, pool); + return pool; + }, + + async closePool(key: string): Promise { + const pool = pools.get(key); + if (pool) { + await pool.end(); + pools.delete(key); + } + }, + + async closeAll(): Promise { + const endPromises = [...pools.values()].map((p) => p.end()); + await Promise.all(endPromises); + pools.clear(); + }, + + get size() { + return pools.size; + }, + }; +} diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index 00fd6ff86..ac746bd25 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -20,12 +20,14 @@ export type { DatabaseCredential, GenerateDatabaseCredentialRequest, LakebasePoolConfig, + LakebasePoolManager, RequestedClaims, RequestedResource, } from "./connectors/lakebase"; // Lakebase Autoscaling connector export { createLakebasePool, + createLakebasePoolManager, generateDatabaseCredential, getLakebaseOrmConfig, getLakebasePgConfig, From cbcb6c87611824f1fa742c51e1f8f4c4a05deb35 Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Mon, 4 May 2026 16:51:27 +0200 Subject: [PATCH 02/13] feat(lakebase): add OBO logging and hasPool method Add console logging to OBO route handlers so pool creation/reuse is visible in app logs. Add hasPool() to LakebasePoolManager interface. Signed-off-by: Pawel Kosiec --- .../server/lakebase-examples/raw-driver-example.ts | 10 ++++++++++ .../appkit/src/connectors/lakebase/pool-manager.ts | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts index dc731b24e..e35af156c 100644 --- a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts +++ b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts @@ -101,9 +101,11 @@ function getUserPool( const userName = req.header("x-forwarded-user"); if (!userToken || !userName) { + console.log("[lakebase-obo] No user token/name — falling back to SP pool"); return { pool: fallbackPool, userName: null }; } + const isNewPool = !oboPoolManager.hasPool(userName); const userPool = oboPoolManager.getPool(userName, { workspaceClient: new WorkspaceClient({ token: userToken, @@ -113,6 +115,14 @@ function getUserPool( user: userName, }); + if (isNewPool) { + console.log( + `[lakebase-obo] Created new OBO pool for user "${userName}" (total pools: ${oboPoolManager.size})`, + ); + } else { + console.log(`[lakebase-obo] Reusing OBO pool for user "${userName}"`); + } + return { pool: userPool, userName }; } diff --git a/packages/appkit/src/connectors/lakebase/pool-manager.ts b/packages/appkit/src/connectors/lakebase/pool-manager.ts index 8aa4bd93a..6b1cfef42 100644 --- a/packages/appkit/src/connectors/lakebase/pool-manager.ts +++ b/packages/appkit/src/connectors/lakebase/pool-manager.ts @@ -16,6 +16,9 @@ export interface LakebasePoolManager { */ getPool(key: string, perPoolConfig: Partial): Pool; + /** Check whether a pool exists for the given key. */ + hasPool(key: string): boolean; + /** Close and remove a specific pool. */ closePool(key: string): Promise; @@ -59,6 +62,10 @@ export function createLakebasePoolManager( return pool; }, + hasPool(key: string): boolean { + return pools.has(key); + }, + async closePool(key: string): Promise { const pool = pools.get(key); if (pool) { From eaf5f322afb8346c7bfcc5532311deef396e0b39 Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Mon, 4 May 2026 17:12:57 +0200 Subject: [PATCH 03/13] chore(playground): strip down to lakebase-only for OBO testing - Remove unused plugins (files, genie, jobs, serving) to avoid resource resolution errors - Add database resource env var mappings in app.yaml - Simplify lakebase plugin to raw driver example only - Downgrade @databricks/lakebase to 0.2.0 for npm registry compat Signed-off-by: Pawel Kosiec --- apps/dev-playground/app.yaml | 37 +--- apps/dev-playground/server/index.ts | 158 +----------------- .../server/lakebase-examples-plugin.ts | 33 +--- .../Function.createLakebasePoolManager.md | 33 ++++ .../appkit/Interface.LakebasePoolManager.md | 93 +++++++++++ docs/docs/api/appkit/index.md | 2 + docs/docs/api/appkit/typedoc-sidebar.ts | 10 ++ packages/lakebase/package.json | 2 +- 8 files changed, 153 insertions(+), 215 deletions(-) create mode 100644 docs/docs/api/appkit/Function.createLakebasePoolManager.md create mode 100644 docs/docs/api/appkit/Interface.LakebasePoolManager.md diff --git a/apps/dev-playground/app.yaml b/apps/dev-playground/app.yaml index 7b57e4ff8..a185b4eca 100644 --- a/apps/dev-playground/app.yaml +++ b/apps/dev-playground/app.yaml @@ -1,31 +1,12 @@ env: - name: DATABRICKS_WAREHOUSE_ID valueFrom: sql-warehouse - - name: DATABRICKS_GENIE_SPACE_ID - valueFrom: genie-space - - name: DATABRICKS_SERVING_ENDPOINT_NAME - valueFrom: serving-endpoint - - name: DATABRICKS_JOB_ID - valueFrom: job - # Files plugin manifest declares a static DATABRICKS_VOLUME_FILES - # requirement; keep it bound so appkit's runtime validation passes - # even though the policy harness below uses its own keys. - - name: DATABRICKS_VOLUME_FILES - valueFrom: volume - # Policy test harness: seven logical volumes, all bound to the same - # underlying UC volume. Policy enforcement runs in-process, so the - # shared physical path is fine. - - name: DATABRICKS_VOLUME_ALLOW_ALL - valueFrom: volume - - name: DATABRICKS_VOLUME_PUBLIC_READ - valueFrom: volume - - name: DATABRICKS_VOLUME_DENY_ALL - valueFrom: volume - - name: DATABRICKS_VOLUME_SP_ONLY - valueFrom: volume - - name: DATABRICKS_VOLUME_ADMIN_ONLY - valueFrom: volume - - name: DATABRICKS_VOLUME_WRITE_ONLY - valueFrom: volume - - name: DATABRICKS_VOLUME_IMPLICIT - valueFrom: volume + # Lakebase database resource — these env vars are read by @databricks/lakebase + - name: PGHOST + valueFrom: database.PGHOST + - name: PGDATABASE + valueFrom: database.PGDATABASE + - name: PGUSER + valueFrom: database.PGUSER + - name: LAKEBASE_ENDPOINT + valueFrom: database.LAKEBASE_ENDPOINT diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index 91179dacd..c16bcdfe9 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -1,19 +1,6 @@ import "reflect-metadata"; -import { - analytics, - createApp, - type FilePolicy, - files, - genie, - jobs, - PolicyDeniedError, - server, - serving, - WRITE_ACTIONS, -} from "@databricks/appkit"; +import { analytics, createApp, server } from "@databricks/appkit"; import { WorkspaceClient } from "@databricks/sdk-experimental"; -// TODO: re-enable once vector-search is exported from @databricks/appkit -// import { vectorSearch } from "@databricks/appkit"; import { lakebaseExamples } from "./lakebase-examples-plugin"; import { reconnect } from "./reconnect-plugin"; import { telemetryExamples } from "./telemetry-example-plugin"; @@ -28,73 +15,13 @@ function createMockClient() { return client; } -/** - * Policy test harness. - * - * Each volume key below is backed by a `DATABRICKS_VOLUME_*` env var in - * `app.yaml` — all seven point at the same underlying UC volume path. - * The different policies are evaluated in-process, so the shared path - * is fine; the logical volume key is what drives enforcement. - * - * Exercises every policy shape the plugin ships with, plus the new - * "no policy configured" default (v0.21.0+). - */ -const ADMIN_USER_ID = process.env.ADMIN_USER_ID ?? ""; - -/** Writes allowed only for the configured admin user ID; reads open. */ -const adminOnly: FilePolicy = (action, _resource, user) => { - if (WRITE_ACTIONS.has(action)) { - return ADMIN_USER_ID !== "" && user.id === ADMIN_USER_ID; - } - return true; -}; - createApp({ plugins: [ server(), reconnect(), telemetryExamples(), analytics({}), - genie({ - spaces: { demo: process.env.DATABRICKS_GENIE_SPACE_ID ?? "placeholder" }, - }), lakebaseExamples(), - files({ - volumes: { - // baseline: everything allowed - allow_all: { policy: files.policy.allowAll() }, - // read-only: uploads/mkdir/delete return 403 - public_read: { policy: files.policy.publicRead() }, - // locked: every action returns 403 (yes, even list) - deny_all: { policy: files.policy.denyAll() }, - // SP can do everything, users can only read (docs example) - sp_only: { - policy: files.policy.any( - (_action, _resource, user) => !!user.isServicePrincipal, - files.policy.publicRead(), - ), - }, - // writes gated on ADMIN_USER_ID env var, reads open - admin_only: { policy: adminOnly }, - // drop-box: writes only, reads denied (not(publicRead)) - write_only: { policy: files.policy.not(files.policy.publicRead()) }, - // no explicit policy → falls back to publicRead() + startup warning - implicit: {}, - }, - }), - jobs(), - serving(), - // TODO: re-enable once vector-search is exported from @databricks/appkit - // vectorSearch({ - // indexes: { - // demo: { - // indexName: - // process.env.DATABRICKS_VS_INDEX_NAME ?? "catalog.schema.index", - // columns: ["id", "text", "title"], - // queryType: "hybrid", - // }, - // }, - // }), ], ...(process.env.APPKIT_E2E_TEST && { client: createMockClient() }), onPluginsReady(appkit) { @@ -141,91 +68,8 @@ createApp({ app.get("/whoami", (req, res) => { res.json({ xForwardedUser: req.header("x-forwarded-user") ?? null, - adminUserId: ADMIN_USER_ID || null, - isAdmin: - ADMIN_USER_ID !== "" && - req.header("x-forwarded-user") === ADMIN_USER_ID, - }); - }); - - /** - * Programmatic API smoke test — service principal path. - * - * All probes are read-only and deny-oriented, so nothing is - * written to the UC volume. Expected results: - * - `allow_all.list` → ok (real SDK call) - * - `deny_all.list` → PolicyDeniedError (deny wins even for SP) - * - `write_only.list` → PolicyDeniedError (reads denied) - * - * Confirms `isServicePrincipal: true` is set on the SP path. - */ - app.get("/policy/sp", async (_req, res) => { - const results = await runProbes([ - ["allow_all", "list", () => appkit.files("allow_all").list()], - ["deny_all", "list", () => appkit.files("deny_all").list()], - ["write_only", "list", () => appkit.files("write_only").list()], - ]); - res.json({ identity: "service_principal", results }); - }); - - /** - * Programmatic API smoke test — OBO (on-behalf-of user) path. - * - * All probes are read-only; no files are written. Expected: - * - `public_read.list` → ok (reads open) - * - `deny_all.list` → PolicyDeniedError - * - `sp_only.list` → ok (publicRead arm of `any()` allows reads) - */ - app.get("/policy/obo", async (req, res) => { - const results = await runProbes([ - [ - "public_read", - "list", - () => appkit.files("public_read").asUser(req).list(), - ], - [ - "deny_all", - "list", - () => appkit.files("deny_all").asUser(req).list(), - ], - ["sp_only", "list", () => appkit.files("sp_only").asUser(req).list()], - ]); - res.json({ - identity: "user", - xForwardedUser: req.header("x-forwarded-user") ?? null, - results, }); }); }); }, }).catch(console.error); - -type ProbeResult = { - volume: string; - action: string; - ok: boolean; - denied: boolean; - error?: string; -}; - -async function runProbes( - probes: Array<[string, string, () => Promise]>, -): Promise { - const out: ProbeResult[] = []; - for (const [volume, action, fn] of probes) { - try { - await fn(); - out.push({ volume, action, ok: true, denied: false }); - } catch (error) { - const denied = error instanceof PolicyDeniedError; - out.push({ - volume, - action, - ok: false, - denied, - error: error instanceof Error ? error.message : String(error), - }); - } - } - return out; -} diff --git a/apps/dev-playground/server/lakebase-examples-plugin.ts b/apps/dev-playground/server/lakebase-examples-plugin.ts index 57533af4a..59a8606dc 100644 --- a/apps/dev-playground/server/lakebase-examples-plugin.ts +++ b/apps/dev-playground/server/lakebase-examples-plugin.ts @@ -5,22 +5,13 @@ import { toPlugin, } from "@databricks/appkit"; import type { IAppRouter } from "shared"; -import * as drizzleExample from "./lakebase-examples/drizzle-example"; import * as rawExample from "./lakebase-examples/raw-driver-example"; -import * as sequelizeExample from "./lakebase-examples/sequelize-example"; -import * as typeormExample from "./lakebase-examples/typeorm-example"; /** * Lakebase Examples Plugin * - * Orchestrates four different approaches to database integration: - * 1. Raw pg.Pool driver - Direct SQL queries - * 2. Drizzle ORM - Type-safe schema definitions - * 3. TypeORM - Entity-based data access - * 4. Sequelize - Model-based ORM with intuitive API - * - * Each example is self-contained and can be used as a reference for - * implementing Lakebase integration in your own applications. + * Demonstrates raw pg.Pool driver with OBO (On-Behalf-Of) authentication + * and Row-Level Security (RLS). */ export class LakebaseExamplesPlugin extends Plugin { @@ -47,14 +38,7 @@ export class LakebaseExamplesPlugin extends Plugin { try { const user = await getUsernameWithApiLookup(); - - // Initialize all four examples in parallel - await Promise.all([ - rawExample.setup(user), - drizzleExample.setup(user), - typeormExample.setup(user), - sequelizeExample.setup(user), - ]); + await rawExample.setup(user); } catch (error) { console.error("Failed to initialize Lakebase examples:", error); // Don't throw - allow app to start even if Lakebase examples fail @@ -67,20 +51,11 @@ export class LakebaseExamplesPlugin extends Plugin { return; } - // Register routes for each example under /api/lakebase-examples/* rawExample.registerRoutes(router, "/raw"); - drizzleExample.registerRoutes(router, "/drizzle"); - typeormExample.registerRoutes(router, "/typeorm"); - sequelizeExample.registerRoutes(router, "/sequelize"); } async close() { - await Promise.all([ - rawExample.cleanup(), - drizzleExample.cleanup(), - typeormExample.cleanup(), - sequelizeExample.cleanup(), - ]); + await rawExample.cleanup(); } } diff --git a/docs/docs/api/appkit/Function.createLakebasePoolManager.md b/docs/docs/api/appkit/Function.createLakebasePoolManager.md new file mode 100644 index 000000000..64e4a1ff5 --- /dev/null +++ b/docs/docs/api/appkit/Function.createLakebasePoolManager.md @@ -0,0 +1,33 @@ +# Function: createLakebasePoolManager() + +```ts +function createLakebasePoolManager(baseConfig?: Partial): LakebasePoolManager; +``` + +Create a pool manager that maintains per-key Lakebase connection pools. + +Each pool is created via `createLakebasePool` with the base config merged +with per-pool overrides (e.g. a user's `workspaceClient` and `user`). + +## Parameters + +| Parameter | Type | +| ------ | ------ | +| `baseConfig?` | `Partial`\<[`LakebasePoolConfig`](Interface.LakebasePoolConfig.md)\> | + +## Returns + +[`LakebasePoolManager`](Interface.LakebasePoolManager.md) + +## Example + +```typescript +const poolManager = createLakebasePoolManager(); + +// In a route handler: +const userPool = poolManager.getPool(userName, { + workspaceClient: new WorkspaceClient({ token: userToken, host, authType: "pat" }), + user: userName, +}); +const result = await userPool.query("SELECT * FROM products"); +``` diff --git a/docs/docs/api/appkit/Interface.LakebasePoolManager.md b/docs/docs/api/appkit/Interface.LakebasePoolManager.md new file mode 100644 index 000000000..2f488d7f3 --- /dev/null +++ b/docs/docs/api/appkit/Interface.LakebasePoolManager.md @@ -0,0 +1,93 @@ +# Interface: LakebasePoolManager + +Manages multiple Lakebase connection pools keyed by an identifier (e.g. userId). + +Used for On-Behalf-Of (OBO) scenarios where each user needs their own pool +with their own OAuth token refresh, enabling features like Row-Level Security. + +## Properties + +### size + +```ts +readonly size: number; +``` + +Number of active pools. + +## Methods + +### closeAll() + +```ts +closeAll(): Promise; +``` + +Close all managed pools (for graceful shutdown). + +#### Returns + +`Promise`\<`void`\> + +*** + +### closePool() + +```ts +closePool(key: string): Promise; +``` + +Close and remove a specific pool. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `key` | `string` | + +#### Returns + +`Promise`\<`void`\> + +*** + +### getPool() + +```ts +getPool(key: string, perPoolConfig: Partial): Pool; +``` + +Get an existing pool or create a new one for the given key. +When creating, merges `perPoolConfig` with the base config passed to the factory. +On subsequent calls with the same key, `perPoolConfig` is ignored and the cached pool is returned. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `key` | `string` | +| `perPoolConfig` | `Partial`\<[`LakebasePoolConfig`](Interface.LakebasePoolConfig.md)\> | + +#### Returns + +`Pool` + +*** + +### hasPool() + +```ts +hasPool(key: string): boolean; +``` + +Check whether a pool exists for the given key. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `key` | `string` | + +#### Returns + +`boolean` diff --git a/docs/docs/api/appkit/index.md b/docs/docs/api/appkit/index.md index 5a21e935f..8a4880250 100644 --- a/docs/docs/api/appkit/index.md +++ b/docs/docs/api/appkit/index.md @@ -44,6 +44,7 @@ plugin architecture, and React integration. | [JobConfig](Interface.JobConfig.md) | Per-job configuration options. | | [JobsConnectorConfig](Interface.JobsConnectorConfig.md) | - | | [LakebasePoolConfig](Interface.LakebasePoolConfig.md) | Configuration for creating a Lakebase connection pool | +| [LakebasePoolManager](Interface.LakebasePoolManager.md) | Manages multiple Lakebase connection pools keyed by an identifier (e.g. userId). | | [PluginManifest](Interface.PluginManifest.md) | Plugin manifest that declares metadata and resource requirements. Attached to plugin classes as a static property. Extends the shared PluginManifest with strict resource types. | | [RequestedClaims](Interface.RequestedClaims.md) | Optional claims for fine-grained Unity Catalog table permissions When specified, the returned token will be scoped to only the requested tables | | [RequestedResource](Interface.RequestedResource.md) | Resource to request permissions for in Unity Catalog | @@ -88,6 +89,7 @@ plugin architecture, and React integration. | [appKitTypesPlugin](Function.appKitTypesPlugin.md) | Vite plugin to generate types for AppKit queries. Calls generateFromEntryPoint under the hood. | | [createApp](Function.createApp.md) | Bootstraps AppKit with the provided configuration. | | [createLakebasePool](Function.createLakebasePool.md) | Create a Lakebase pool with appkit's logger integration. Telemetry automatically uses appkit's OpenTelemetry configuration via global registry. | +| [createLakebasePoolManager](Function.createLakebasePoolManager.md) | Create a pool manager that maintains per-key Lakebase connection pools. | | [extractServingEndpoints](Function.extractServingEndpoints.md) | Extract serving endpoint config from a server file by AST-parsing it. Looks for `serving({ endpoints: { alias: { env: "..." }, ... } })` calls and extracts the endpoint alias names and their environment variable mappings. | | [findServerFile](Function.findServerFile.md) | Find the server entry file by checking candidate paths in order. | | [generateDatabaseCredential](Function.generateDatabaseCredential.md) | Generate OAuth credentials for Postgres database connection using the proper Postgres API. | diff --git a/docs/docs/api/appkit/typedoc-sidebar.ts b/docs/docs/api/appkit/typedoc-sidebar.ts index 162c3e68b..3408cc11f 100644 --- a/docs/docs/api/appkit/typedoc-sidebar.ts +++ b/docs/docs/api/appkit/typedoc-sidebar.ts @@ -152,6 +152,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Interface.LakebasePoolConfig", label: "LakebasePoolConfig" }, + { + type: "doc", + id: "api/appkit/Interface.LakebasePoolManager", + label: "LakebasePoolManager" + }, { type: "doc", id: "api/appkit/Interface.PluginManifest", @@ -315,6 +320,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Function.createLakebasePool", label: "createLakebasePool" }, + { + type: "doc", + id: "api/appkit/Function.createLakebasePoolManager", + label: "createLakebasePoolManager" + }, { type: "doc", id: "api/appkit/Function.extractServingEndpoints", diff --git a/packages/lakebase/package.json b/packages/lakebase/package.json index 305c13670..b0c27a40f 100644 --- a/packages/lakebase/package.json +++ b/packages/lakebase/package.json @@ -1,7 +1,7 @@ { "name": "@databricks/lakebase", "type": "module", - "version": "0.3.0", + "version": "0.2.0", "description": "PostgreSQL driver for Databricks Lakebase Autoscaling with automatic OAuth token refresh", "main": "./dist/index.js", "types": "./dist/index.d.ts", From 6b025842ac43f3ed8c982346aad3daaf5fbf1ba1 Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Mon, 4 May 2026 20:59:20 +0200 Subject: [PATCH 04/13] feat(playground): switch to UUID, add GRANT PUBLIC, and OBO UI panel - Change id from SERIAL to UUID with gen_random_uuid() default to avoid sequence permission issues with OBO users - Grant schema/table access to PUBLIC so OBO users can SELECT/INSERT - Add OboProductsPanel component with full email visibility - Add debug-token endpoint and use x-forwarded-email header Signed-off-by: Pawel Kosiec --- apps/dev-playground/app.yaml | 9 +- .../components/lakebase/OboProductsPanel.tsx | 369 ++++++++++++++++++ .../client/src/components/lakebase/index.ts | 1 + .../client/src/routes/lakebase.route.tsx | 10 +- .../lakebase-examples/raw-driver-example.ts | 41 +- 5 files changed, 408 insertions(+), 22 deletions(-) create mode 100644 apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx diff --git a/apps/dev-playground/app.yaml b/apps/dev-playground/app.yaml index a185b4eca..66fbf6c24 100644 --- a/apps/dev-playground/app.yaml +++ b/apps/dev-playground/app.yaml @@ -1,12 +1,5 @@ env: - name: DATABRICKS_WAREHOUSE_ID valueFrom: sql-warehouse - # Lakebase database resource — these env vars are read by @databricks/lakebase - - name: PGHOST - valueFrom: database.PGHOST - - name: PGDATABASE - valueFrom: database.PGDATABASE - - name: PGUSER - valueFrom: database.PGUSER - name: LAKEBASE_ENDPOINT - valueFrom: database.LAKEBASE_ENDPOINT + valueFrom: database diff --git a/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx b/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx new file mode 100644 index 000000000..0e29f5a78 --- /dev/null +++ b/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx @@ -0,0 +1,369 @@ +import { + Badge, + Button, + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, + Input, +} from "@databricks/appkit-ui/react"; +import { Loader2, Package, ShieldCheck } from "lucide-react"; +import { useId, useState } from "react"; +import { useLakebaseData, useLakebasePost } from "@/hooks/use-lakebase-data"; + +interface Product { + id: string; + name: string; + category: string; + price: number | string; + stock: number; + created_by: string | null; + created_at: string; +} + +interface MyProductsResponse { + user: string; + products: Product[]; +} + +interface CreateProductRequest { + name: string; + category: string; + price: number; + stock: number; +} + +export function OboProductsPanel() { + const nameId = useId(); + const categoryId = useId(); + const priceId = useId(); + const stockId = useId(); + + const { + data: myData, + loading: myLoading, + error: myError, + refetch: refetchMy, + } = useLakebaseData( + "/api/lakebase-examples/raw/my-products", + ); + + const { + data: allProducts, + loading: allLoading, + error: allError, + refetch: refetchAll, + } = useLakebaseData("/api/lakebase-examples/raw/products"); + + const { post, loading: creating } = useLakebasePost< + CreateProductRequest, + Product + >("/api/lakebase-examples/raw/my-products"); + + const generateRandomProduct = () => { + const products = [ + "Ergonomic Keyboard", + "Wireless Mouse", + "USB-C Hub", + "Laptop Stand", + "Monitor Arm", + "Mechanical Keyboard", + "Gaming Headset", + "Webcam HD", + ]; + const categories = ["Electronics", "Accessories", "Peripherals", "Office"]; + const price = (Math.random() * (199.99 - 29.99) + 29.99).toFixed(2); + const stock = Math.floor(Math.random() * (500 - 50) + 50); + + return { + name: products[Math.floor(Math.random() * products.length)], + category: categories[Math.floor(Math.random() * categories.length)], + price, + stock: String(stock), + }; + }; + + const [formData, setFormData] = useState(generateRandomProduct()); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + const result = await post({ + name: formData.name, + category: formData.category, + price: Number(formData.price), + stock: Number(formData.stock), + }); + + if (result) { + setFormData(generateRandomProduct()); + refetchMy(); + refetchAll(); + } + }; + + const myProducts = myData?.products ?? []; + + return ( +
+ {/* Header */} + + +
+
+ +
+
+ Raw Driver — On-Behalf-Of (OBO) + + Per-user connection pool with Row-Level Security (RLS). Each + user gets their own pg.Pool authenticated with their Databricks + identity. The database filters rows based on{" "} + current_user. + +
+ {myData && ( + + User: {myData.user} + + )} +
+
+
+ + {/* Create product as user */} + + + + Create Product (as current user) + + + This product will have created_by set to your identity. + RLS will make it visible only to you. + + + +
+
+
+ + + setFormData({ ...formData, name: e.target.value }) + } + placeholder="Wireless Mouse" + required + /> +
+
+ + + setFormData({ ...formData, category: e.target.value }) + } + placeholder="Electronics" + required + /> +
+
+ + + setFormData({ ...formData, price: e.target.value }) + } + placeholder="29.99" + required + /> +
+
+ + + setFormData({ ...formData, stock: e.target.value }) + } + placeholder="100" + required + /> +
+
+ +
+
+
+ + {/* Side-by-side comparison */} +
+ {/* My products (OBO, RLS filtered) */} + + +
+
+ + My Products (OBO pool) + + + RLS-filtered: only your rows visible + +
+ +
+
+ + {myLoading && ( +
+
+ Loading... +
+ )} + {myError && ( +
+ {myError.message} +
+ )} + {!myLoading && myProducts.length === 0 && ( +
+ +

No products yet. Create one above.

+
+ )} + {myProducts.length > 0 && ( + + )} + + + + {/* All products (SP, bypasses RLS) */} + + +
+
+ + All Products (SP pool) + + + Service principal bypasses RLS + +
+ +
+
+ + {allLoading && ( +
+
+ Loading... +
+ )} + {allError && ( +
+ {allError.message} +
+ )} + {allProducts && allProducts.length > 0 && ( + + )} + + +
+
+ ); +} + +function ProductTable({ + products, + showCreatedBy, +}: { + products: Product[]; + showCreatedBy?: boolean; +}) { + return ( +
+ + + + + + + + {showCreatedBy && ( + + )} + + + + {products.map((p) => ( + + + + + + {showCreatedBy && ( + + )} + + ))} + +
+ ID + + Name + + Category + + Price + + Created By +
{p.id}{p.name} + {p.category} + + ${Number(p.price).toFixed(2)} + + {p.created_by ?? "—"} +
+
+ ); +} diff --git a/apps/dev-playground/client/src/components/lakebase/index.ts b/apps/dev-playground/client/src/components/lakebase/index.ts index 6ba528a63..c6600a533 100644 --- a/apps/dev-playground/client/src/components/lakebase/index.ts +++ b/apps/dev-playground/client/src/components/lakebase/index.ts @@ -1,4 +1,5 @@ export { ActivityLogsPanel } from "./ActivityLogsPanel"; +export { OboProductsPanel } from "./OboProductsPanel"; export { OrdersPanel } from "./OrdersPanel"; export { ProductsPanel } from "./ProductsPanel"; export { TasksPanel } from "./TasksPanel"; diff --git a/apps/dev-playground/client/src/routes/lakebase.route.tsx b/apps/dev-playground/client/src/routes/lakebase.route.tsx index 59694e248..d52901de4 100644 --- a/apps/dev-playground/client/src/routes/lakebase.route.tsx +++ b/apps/dev-playground/client/src/routes/lakebase.route.tsx @@ -7,6 +7,7 @@ import { import { createFileRoute, retainSearchParams } from "@tanstack/react-router"; import { ActivityLogsPanel, + OboProductsPanel, OrdersPanel, ProductsPanel, TasksPanel, @@ -34,14 +35,19 @@ function LakebaseRoute() {
{/* Tabs for different examples */} - - + + + Raw Driver OBO Raw Driver Drizzle ORM TypeORM Sequelize + + + + diff --git a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts index e35af156c..9f131f606 100644 --- a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts +++ b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts @@ -24,7 +24,7 @@ let oboPoolManager: LakebasePoolManager; */ interface Product { - id: number; + id: string; name: string; category: string; price: number; @@ -40,12 +40,15 @@ export async function setup(user?: string) { // Create OBO pool manager for per-user pools oboPoolManager = createLakebasePoolManager(); - // Create schema and table (idempotent) + // Drop and recreate (SERIAL→UUID migration; safe for POC) + await pool.query(`DROP TABLE IF EXISTS raw_example.products CASCADE`); + + // Create schema and table await pool.query(` CREATE SCHEMA IF NOT EXISTS raw_example; CREATE TABLE IF NOT EXISTS raw_example.products ( - id SERIAL PRIMARY KEY, + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, category VARCHAR(100), price DECIMAL(10, 2), @@ -80,6 +83,12 @@ export async function setup(user?: string) { $$; `); + // Grant schema/table access to PUBLIC so OBO users can SELECT/INSERT + await pool.query(` + GRANT USAGE ON SCHEMA raw_example TO PUBLIC; + GRANT ALL ON ALL TABLES IN SCHEMA raw_example TO PUBLIC; + `); + // Seed sample data if table is empty const { rows } = await pool.query<{ count: string }>( "SELECT COUNT(*) as count FROM raw_example.products", @@ -98,32 +107,32 @@ function getUserPool( fallbackPool: Pool, ): { pool: Pool; userName: string | null } { const userToken = req.header("x-forwarded-access-token"); - const userName = req.header("x-forwarded-user"); + const userEmail = req.header("x-forwarded-email"); - if (!userToken || !userName) { - console.log("[lakebase-obo] No user token/name — falling back to SP pool"); + if (!userToken || !userEmail) { + console.log("[lakebase-obo] No user token/email — falling back to SP pool"); return { pool: fallbackPool, userName: null }; } - const isNewPool = !oboPoolManager.hasPool(userName); - const userPool = oboPoolManager.getPool(userName, { + const isNewPool = !oboPoolManager.hasPool(userEmail); + const userPool = oboPoolManager.getPool(userEmail, { workspaceClient: new WorkspaceClient({ token: userToken, host: process.env.DATABRICKS_HOST, authType: "pat", }), - user: userName, + user: userEmail, }); if (isNewPool) { console.log( - `[lakebase-obo] Created new OBO pool for user "${userName}" (total pools: ${oboPoolManager.size})`, + `[lakebase-obo] Created new OBO pool for user "${userEmail}" (total pools: ${oboPoolManager.size})`, ); } else { - console.log(`[lakebase-obo] Reusing OBO pool for user "${userName}"`); + console.log(`[lakebase-obo] Reusing OBO pool for user "${userEmail}"`); } - return { pool: userPool, userName }; + return { pool: userPool, userName: userEmail }; } export function registerRoutes(router: IAppRouter, basePath: string) { @@ -184,6 +193,14 @@ export function registerRoutes(router: IAppRouter, basePath: string) { } }); + // GET /raw/debug-token - Show forwarded headers for debugging + router.get(`${basePath}/debug-token`, (req, res) => { + const allForwarded = Object.fromEntries( + Object.entries(req.headers).filter(([k]) => k.startsWith("x-forwarded")), + ); + res.json(allForwarded); + }); + // ── OBO routes (per-user pool, RLS enforced) ───────────────────── // GET /raw/my-products - List products visible to the current user (RLS-filtered) From 54ffb629b5bf07be14293b0367659215efee778e Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Tue, 5 May 2026 15:45:13 +0200 Subject: [PATCH 05/13] feat(lakebase): add asUser(req) OBO support to Lakebase plugin Integrate per-user connection pools into the core Lakebase plugin so users get the standard asUser(req) pattern for OBO queries. Each user gets a separate pg.Pool authenticated with their Databricks identity, enabling Row-Level Security via current_user. - Override asUser(req) on LakebasePlugin to create per-user pools - Add idle pool eviction to pool manager (5min cleanup interval) - Use current_user in SQL instead of parsing headers in route handlers - Update dev-playground to use plugin's asUser(req) for OBO routes - Add OBO documentation section to lakebase.md and execution-context.md - Update template: UUID ids, created_by column, asUser for INSERT Signed-off-by: Pawel Kosiec --- .../components/lakebase/OboProductsPanel.tsx | 31 +--- .../client/src/routes/lakebase.route.tsx | 16 +- apps/dev-playground/server/index.ts | 50 +++++- .../lakebase-examples/raw-driver-example.ts | 118 +------------ .../Function.createLakebasePoolManager.md | 3 + .../appkit/Interface.LakebasePoolManager.md | 2 +- docs/docs/plugins/execution-context.md | 6 + docs/docs/plugins/lakebase.md | 62 +++++++ .../src/connectors/lakebase/pool-manager.ts | 22 ++- .../appkit/src/plugins/lakebase/lakebase.ts | 156 +++++++++++++++++- .../src/pages/lakebase/LakebasePage.tsx | 20 ++- .../server/routes/lakebase/todo-routes.ts | 41 +++-- 12 files changed, 347 insertions(+), 180 deletions(-) diff --git a/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx b/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx index 0e29f5a78..673570a3a 100644 --- a/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx +++ b/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx @@ -22,11 +22,6 @@ interface Product { created_at: string; } -interface MyProductsResponse { - user: string; - products: Product[]; -} - interface CreateProductRequest { name: string; category: string; @@ -41,13 +36,11 @@ export function OboProductsPanel() { const stockId = useId(); const { - data: myData, + data: myProducts, loading: myLoading, error: myError, refetch: refetchMy, - } = useLakebaseData( - "/api/lakebase-examples/raw/my-products", - ); + } = useLakebaseData("/api/lakebase-examples/raw/my-products"); const { data: allProducts, @@ -102,7 +95,7 @@ export function OboProductsPanel() { } }; - const myProducts = myData?.products ?? []; + const myProductsList = myProducts ?? []; return (
@@ -122,11 +115,6 @@ export function OboProductsPanel() { current_user.
- {myData && ( - - User: {myData.user} - - )}
@@ -242,7 +230,8 @@ export function OboProductsPanel() { My Products (OBO pool) - RLS-filtered: only your rows visible + RLS-filtered via per-user pool. Users with{" "} + databricks_superuser role bypass RLS.