diff --git a/apps/dev-playground/app.yaml b/apps/dev-playground/app.yaml index 7b57e4ff8..2bfe81d9c 100644 --- a/apps/dev-playground/app.yaml +++ b/apps/dev-playground/app.yaml @@ -5,8 +5,6 @@ env: valueFrom: genie-space - name: DATABRICKS_SERVING_ENDPOINT_NAME valueFrom: serving-endpoint - - name: DATABRICKS_JOB_ID - valueFrom: job # Files plugin manifest declares a static DATABRICKS_VOLUME_FILES # requirement; keep it bound so appkit's runtime validation passes # even though the policy harness below uses its own keys. @@ -29,3 +27,11 @@ env: valueFrom: volume - name: DATABRICKS_VOLUME_IMPLICIT valueFrom: volume + # OBO demo: same physical volume; auth: "on-behalf-of-user" routes + # HTTP traffic through runInUserContext so SDK calls execute as the + # end user. + - name: DATABRICKS_VOLUME_OBO_DEMO + valueFrom: volume + # Lakebase database resource + - name: LAKEBASE_ENDPOINT + valueFrom: database diff --git a/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx b/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx new file mode 100644 index 000000000..673570a3a --- /dev/null +++ b/apps/dev-playground/client/src/components/lakebase/OboProductsPanel.tsx @@ -0,0 +1,354 @@ +import { + Badge, + Button, + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, + Input, +} from "@databricks/appkit-ui/react"; +import { Loader2, Package, ShieldCheck } from "lucide-react"; +import { useId, useState } from "react"; +import { useLakebaseData, useLakebasePost } from "@/hooks/use-lakebase-data"; + +interface Product { + id: string; + name: string; + category: string; + price: number | string; + stock: number; + created_by: string | null; + created_at: string; +} + +interface CreateProductRequest { + name: string; + category: string; + price: number; + stock: number; +} + +export function OboProductsPanel() { + const nameId = useId(); + const categoryId = useId(); + const priceId = useId(); + const stockId = useId(); + + const { + data: myProducts, + loading: myLoading, + error: myError, + refetch: refetchMy, + } = useLakebaseData("/api/lakebase-examples/raw/my-products"); + + const { + data: allProducts, + loading: allLoading, + error: allError, + refetch: refetchAll, + } = useLakebaseData("/api/lakebase-examples/raw/products"); + + const { post, loading: creating } = useLakebasePost< + CreateProductRequest, + Product + >("/api/lakebase-examples/raw/my-products"); + + const generateRandomProduct = () => { + const products = [ + "Ergonomic Keyboard", + "Wireless Mouse", + "USB-C Hub", + "Laptop Stand", + "Monitor Arm", + "Mechanical Keyboard", + "Gaming Headset", + "Webcam HD", + ]; + const categories = ["Electronics", "Accessories", "Peripherals", "Office"]; + const price = (Math.random() * (199.99 - 29.99) + 29.99).toFixed(2); + const stock = Math.floor(Math.random() * (500 - 50) + 50); + + return { + name: products[Math.floor(Math.random() * products.length)], + category: categories[Math.floor(Math.random() * categories.length)], + price, + stock: String(stock), + }; + }; + + const [formData, setFormData] = useState(generateRandomProduct()); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + const result = await post({ + name: formData.name, + category: formData.category, + price: Number(formData.price), + stock: Number(formData.stock), + }); + + if (result) { + setFormData(generateRandomProduct()); + refetchMy(); + refetchAll(); + } + }; + + const myProductsList = myProducts ?? []; + + return ( +
+ {/* Header */} + + +
+
+ +
+
+ Raw Driver — On-Behalf-Of (OBO) + + Per-user connection pool with Row-Level Security (RLS). Each + user gets their own pg.Pool authenticated with their Databricks + identity. The database filters rows based on{" "} + current_user. + +
+
+
+
+ + {/* Create product as user */} + + + + Create Product (as current user) + + + This product will have created_by set to your identity. + RLS will make it visible only to you. + + + +
+
+
+ + + setFormData({ ...formData, name: e.target.value }) + } + placeholder="Wireless Mouse" + required + /> +
+
+ + + setFormData({ ...formData, category: e.target.value }) + } + placeholder="Electronics" + required + /> +
+
+ + + setFormData({ ...formData, price: e.target.value }) + } + placeholder="29.99" + required + /> +
+
+ + + setFormData({ ...formData, stock: e.target.value }) + } + placeholder="100" + required + /> +
+
+ +
+
+
+ + {/* Side-by-side comparison */} +
+ {/* My products (OBO, RLS filtered) */} + + +
+
+ + My Products (OBO pool) + + + RLS-filtered via per-user pool. Users with{" "} + databricks_superuser role bypass RLS. + +
+ +
+
+ + {myLoading && ( +
+
+ Loading... +
+ )} + {myError && ( +
+ {myError.message} +
+ )} + {!myLoading && myProductsList.length === 0 && ( +
+ +

No products yet. Create one above.

+
+ )} + {myProductsList.length > 0 && ( + + )} + + + + {/* All products (SP, bypasses RLS) */} + + +
+
+ + All Products (SP pool) + + + Service principal bypasses RLS + +
+ +
+
+ + {allLoading && ( +
+
+ Loading... +
+ )} + {allError && ( +
+ {allError.message} +
+ )} + {allProducts && allProducts.length > 0 && ( + + )} + + +
+
+ ); +} + +function ProductTable({ + products, + showCreatedBy, +}: { + products: Product[]; + showCreatedBy?: boolean; +}) { + return ( +
+ + + + + + + {showCreatedBy && ( + + )} + + + + {products.map((p) => ( + + + + + {showCreatedBy && ( + + )} + + ))} + +
+ Name + + Category + + Price + + Created By +
{p.name} + {p.category} + + ${Number(p.price).toFixed(2)} + + {p.created_by ?? "—"} +
+
+ ); +} diff --git a/apps/dev-playground/client/src/components/lakebase/ProductsPanel.tsx b/apps/dev-playground/client/src/components/lakebase/ProductsPanel.tsx deleted file mode 100644 index d1b6c690a..000000000 --- a/apps/dev-playground/client/src/components/lakebase/ProductsPanel.tsx +++ /dev/null @@ -1,305 +0,0 @@ -import { - Badge, - Button, - Card, - CardContent, - CardDescription, - CardHeader, - CardTitle, - Input, -} from "@databricks/appkit-ui/react"; -import { Database, Loader2, Package } from "lucide-react"; -import { useId, useState } from "react"; -import { useLakebaseData, useLakebasePost } from "@/hooks/use-lakebase-data"; - -interface Product { - id: number; - name: string; - category: string; - price: number | string; // PostgreSQL DECIMAL returns as string - stock: number; - created_by?: string; - created_at: string; -} - -interface CreateProductRequest { - name: string; - category: string; - price: number; - stock: number; -} - -interface HealthStatus { - status: string; - connected: boolean; - message: string; -} - -export function ProductsPanel() { - const nameId = useId(); - const categoryId = useId(); - const priceId = useId(); - const stockId = useId(); - - const { - data: products, - loading: productsLoading, - error: productsError, - refetch, - } = useLakebaseData("/api/lakebase-examples/raw/products"); - - const { data: health } = useLakebaseData( - "/api/lakebase-examples/raw/health", - ); - - const { post, loading: creating } = useLakebasePost< - CreateProductRequest, - Product - >("/api/lakebase-examples/raw/products"); - - const generateRandomProduct = () => { - const products = [ - "Ergonomic Keyboard", - "Wireless Mouse", - "USB-C Hub", - "Laptop Stand", - "Monitor Arm", - "Mechanical Keyboard", - "Gaming Headset", - "Webcam HD", - ]; - const categories = ["Electronics", "Accessories", "Peripherals", "Office"]; - const price = (Math.random() * (199.99 - 29.99) + 29.99).toFixed(2); - const stock = Math.floor(Math.random() * (500 - 50) + 50); - - return { - name: products[Math.floor(Math.random() * products.length)], - category: categories[Math.floor(Math.random() * categories.length)], - price, - stock: String(stock), - }; - }; - - const [formData, setFormData] = useState(generateRandomProduct()); - - const handleSubmit = async (e: React.FormEvent) => { - e.preventDefault(); - const result = await post({ - name: formData.name, - category: formData.category, - price: Number(formData.price), - stock: Number(formData.stock), - }); - - if (result) { - setFormData(generateRandomProduct()); - refetch(); - } - }; - - return ( -
- {/* Header with connection status */} - - -
-
-
- -
-
- Raw Driver Example - - Direct PostgreSQL connection using pg.Pool with automatic - OAuth token refresh - -
-
- {health && ( - - {health.connected ? "Connected" : "Disconnected"} - - )} -
-
-
- - {/* Create product form */} - - - Create Product - - -
-
-
- - - setFormData({ ...formData, name: e.target.value }) - } - placeholder="Wireless Mouse" - required - /> -
-
- - - setFormData({ ...formData, category: e.target.value }) - } - placeholder="Electronics" - required - /> -
-
- - - setFormData({ ...formData, price: e.target.value }) - } - placeholder="29.99" - required - /> -
-
- - - setFormData({ ...formData, stock: e.target.value }) - } - placeholder="100" - required - /> -
-
- -
-
-
- - {/* Products list */} - - -
- Products Catalog - -
-
- - {productsLoading && ( -
-
- Loading products... -
- )} - - {productsError && ( -
- Error:{" "} - {productsError.message} -
- )} - - {products && products.length === 0 && ( -
- -

No products available. Create your first product above.

-
- )} - - {products && products.length > 0 && ( -
- - - - - - - - - - - - {products.map((product) => ( - - - - - - - - ))} - -
- ID - - Name - - Category - - Price - - Stock -
{product.id}{product.name} - {product.category} - - ${Number(product.price).toFixed(2)} - - {product.stock} -
-
- )} - - -
- ); -} diff --git a/apps/dev-playground/client/src/components/lakebase/index.ts b/apps/dev-playground/client/src/components/lakebase/index.ts index 6ba528a63..d64d2e3d1 100644 --- a/apps/dev-playground/client/src/components/lakebase/index.ts +++ b/apps/dev-playground/client/src/components/lakebase/index.ts @@ -1,4 +1,4 @@ export { ActivityLogsPanel } from "./ActivityLogsPanel"; +export { OboProductsPanel } from "./OboProductsPanel"; export { OrdersPanel } from "./OrdersPanel"; -export { ProductsPanel } from "./ProductsPanel"; export { TasksPanel } from "./TasksPanel"; diff --git a/apps/dev-playground/client/src/routes/lakebase.route.tsx b/apps/dev-playground/client/src/routes/lakebase.route.tsx index 59694e248..9ea8117e6 100644 --- a/apps/dev-playground/client/src/routes/lakebase.route.tsx +++ b/apps/dev-playground/client/src/routes/lakebase.route.tsx @@ -7,8 +7,8 @@ import { import { createFileRoute, retainSearchParams } from "@tanstack/react-router"; import { ActivityLogsPanel, + OboProductsPanel, OrdersPanel, - ProductsPanel, TasksPanel, } from "@/components/lakebase"; @@ -23,7 +23,6 @@ function LakebaseRoute() { return (
- {/* Page header */}

Lakebase Examples

@@ -33,17 +32,16 @@ function LakebaseRoute() {

- {/* Tabs for different examples */} - Raw Driver + Raw Driver (OBO) Drizzle ORM TypeORM Sequelize - + diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index 91179dacd..4dc0a814c 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -6,6 +6,7 @@ import { files, genie, jobs, + lakebase, PolicyDeniedError, server, serving, @@ -49,6 +50,15 @@ const adminOnly: FilePolicy = (action, _resource, user) => { return true; }; +/** + * OBO demo policy: deny anything running as the SP (including the dev + * fallback when no `x-forwarded-access-token` is present). Only real + * end-users (`isServicePrincipal: false`) get through. + */ +const usersOnly: FilePolicy = (_action, _resource, user) => { + return user.isServicePrincipal !== true; +}; + createApp({ plugins: [ server(), @@ -58,6 +68,7 @@ createApp({ genie({ spaces: { demo: process.env.DATABRICKS_GENIE_SPACE_ID ?? "placeholder" }, }), + ...(process.env.LAKEBASE_ENDPOINT ? [lakebase()] : []), lakebaseExamples(), files({ volumes: { @@ -80,6 +91,14 @@ createApp({ write_only: { policy: files.policy.not(files.policy.publicRead()) }, // no explicit policy → falls back to publicRead() + startup warning implicit: {}, + // OBO demo volume — auth: "on-behalf-of-user" routes HTTP traffic + // through `runInUserContext` so SDK calls execute with the end + // user's access token. The `usersOnly` policy denies any traffic + // that wasn't authenticated via `x-forwarded-access-token`. + obo_demo: { + auth: "on-behalf-of-user", + policy: usersOnly, + }, }, }), jobs(), @@ -99,6 +118,51 @@ createApp({ ...(process.env.APPKIT_E2E_TEST && { client: createMockClient() }), onPluginsReady(appkit) { appkit.server.extend((app) => { + // ── Lakebase OBO routes (per-user pool, RLS enforced) ────────── + + if ("lakebase" in appkit) { + // GET /api/lakebase-examples/raw/my-products — RLS-filtered list + app.get("/api/lakebase-examples/raw/my-products", async (req, res) => { + try { + const result = await appkit.lakebase + .asUser(req) + .query( + "SELECT * FROM raw_example.products ORDER BY created_at DESC", + ); + res.json(result.rows); + } catch (error: unknown) { + const err = error as Error; + res.status(500).json({ + error: "Failed to fetch user products", + message: err.message, + }); + } + }); + + // POST /api/lakebase-examples/raw/my-products — create as user + // created_by is set to current_user by the per-user pool's identity + app.post("/api/lakebase-examples/raw/my-products", async (req, res) => { + try { + const { name, category, price, stock } = req.body; + + const result = await appkit.lakebase.asUser(req).query( + `INSERT INTO raw_example.products (name, category, price, stock, created_by) + VALUES ($1, $2, $3, $4, current_user) RETURNING *`, + [name, category, Number(price), Number(stock)], + ); + res.json(result.rows[0]); + } catch (error: unknown) { + const err = error as Error; + res.status(500).json({ + error: "Failed to create product", + message: err.message, + }); + } + }); + } + + // ── Analytics examples ────────── + app.get("/sp", (_req, res) => { appkit.analytics .query("SELECT * FROM samples.nyctaxi.trips;") @@ -196,6 +260,43 @@ createApp({ results, }); }); + + /** + * Per-volume OBO mode demo. Hits the `obo_demo` volume — configured + * with `auth: "on-behalf-of-user"` — to confirm: + * + * 1. With a forwarded user identity, HTTP routes execute the SDK + * call as the end user (request goes through `runInUserContext`). + * 2. Without `x-forwarded-access-token`, production returns 401; + * development falls back to the SP and the `usersOnly` policy + * rejects with 403. + * 3. Programmatic `appkit.files("obo_demo").asUser(req).list()` runs + * inside the same user context. + * + * Returns the HTTP status, body, and the user identity the server + * observes — so the policy-matrix client can render a clear + * pass/fail panel. + */ + app.get("/policy/obo-volume", async (req, res) => { + const xForwardedUser = req.header("x-forwarded-user") ?? null; + const xForwardedToken = + (req.header("x-forwarded-access-token")?.length ?? 0) > 0; + + const programmatic: ProbeResult[] = await runProbes([ + [ + "obo_demo", + "list", + () => appkit.files("obo_demo").asUser(req).list(), + ], + ]); + + res.json({ + mode: "on-behalf-of-user", + xForwardedUser, + xForwardedAccessTokenPresent: xForwardedToken, + programmatic, + }); + }); }); }, }).catch(console.error); diff --git a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts index 43b2ca3b2..327b5a601 100644 --- a/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts +++ b/apps/dev-playground/server/lakebase-examples/raw-driver-example.ts @@ -11,22 +11,25 @@ let pool: Pool; * - Direct pg.Pool usage without ORM abstraction * - Manual SQL query writing with parameterized queries * - Schema and table creation (idempotent) - * - Basic CRUD operations - * - Connection health checking + * - Row-Level Security (RLS) setup + * - Basic CRUD operations (SP pool) + * + * OBO routes are registered separately in index.ts via the Lakebase plugin's + * `asUser(req)` pattern — see `onPluginsReady`. */ interface Product { - id: number; + id: string; name: string; category: string; price: number; stock: number; - created_by?: string; + created_by: string | null; created_at: Date; } export async function setup(user?: string) { - // Create pool with automatic OAuth token refresh + // Create service principal pool with automatic OAuth token refresh pool = createLakebasePool({ user }); // Create schema and table (idempotent) @@ -34,15 +37,47 @@ export async function setup(user?: string) { CREATE SCHEMA IF NOT EXISTS raw_example; CREATE TABLE IF NOT EXISTS raw_example.products ( - id SERIAL PRIMARY KEY, + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, category VARCHAR(100), price DECIMAL(10, 2), stock INTEGER DEFAULT 0, + created_by VARCHAR(255) DEFAULT current_user, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); `); + // Enable Row-Level Security (idempotent) + await pool.query(` + ALTER TABLE raw_example.products ENABLE ROW LEVEL SECURITY; + `); + + // Create RLS policy (idempotent via IF NOT EXISTS-like pattern) + // Users see only rows they created (or rows with NULL created_by for seed data). + // The table owner (service principal) bypasses RLS automatically. + await pool.query(` + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_policies + WHERE schemaname = 'raw_example' + AND tablename = 'products' + AND policyname = 'user_products_policy' + ) THEN + CREATE POLICY user_products_policy ON raw_example.products + FOR ALL TO PUBLIC + USING (created_by = current_user OR created_by IS NULL); + END IF; + END + $$; + `); + + // Grant schema/table access to PUBLIC so OBO users can SELECT/INSERT + await pool.query(` + GRANT USAGE ON SCHEMA raw_example TO PUBLIC; + GRANT ALL ON ALL TABLES IN SCHEMA raw_example TO PUBLIC; + `); + // Seed sample data if table is empty const { rows } = await pool.query<{ count: string }>( "SELECT COUNT(*) as count FROM raw_example.products", @@ -53,7 +88,9 @@ export async function setup(user?: string) { } export function registerRoutes(router: IAppRouter, basePath: string) { - // GET /api/lakebase-examples/raw/products - List all products + // ── Service principal routes (bypass RLS as table owner) ────────── + + // GET /raw/products - List ALL products (SP pool, bypasses RLS) router.get(`${basePath}/products`, async (_req, res) => { try { const result = await pool.query( @@ -69,7 +106,7 @@ export function registerRoutes(router: IAppRouter, basePath: string) { } }); - // POST /api/lakebase-examples/raw/products - Create new product + // POST /raw/products - Create product as SP (no created_by) router.post(`${basePath}/products`, async (req, res) => { try { const { name, category, price, stock } = req.body; @@ -89,7 +126,7 @@ export function registerRoutes(router: IAppRouter, basePath: string) { } }); - // GET /api/lakebase-examples/raw/health - Connection health check + // GET /raw/health - Connection health check router.get(`${basePath}/health`, async (_req, res) => { try { await pool.query("SELECT 1"); diff --git a/docs/docs/api/appkit/Function.createLakebasePoolManager.md b/docs/docs/api/appkit/Function.createLakebasePoolManager.md new file mode 100644 index 000000000..9bcf25ef1 --- /dev/null +++ b/docs/docs/api/appkit/Function.createLakebasePoolManager.md @@ -0,0 +1,36 @@ +# Function: createLakebasePoolManager() + +```ts +function createLakebasePoolManager(baseConfig?: Partial): LakebasePoolManager; +``` + +Create a pool manager that maintains per-key Lakebase connection pools. + +Each pool is created via `createLakebasePool` with the base config merged +with per-pool overrides (e.g. a user's `workspaceClient` and `user`). + +A periodic cleanup removes empty Pool objects (where all connections have +been closed by pg's built-in `idleTimeoutMillis`) from the internal Map. + +## Parameters + +| Parameter | Type | +| ------ | ------ | +| `baseConfig?` | `Partial`\<[`LakebasePoolConfig`](Interface.LakebasePoolConfig.md)\> | + +## Returns + +[`LakebasePoolManager`](Interface.LakebasePoolManager.md) + +## Example + +```typescript +const poolManager = createLakebasePoolManager(); + +// In a route handler: +const userPool = poolManager.getPool(userName, { + workspaceClient: new WorkspaceClient({ token: userToken, host, authType: "pat" }), + user: userName, +}); +const result = await userPool.query("SELECT * FROM products"); +``` diff --git a/docs/docs/api/appkit/Interface.LakebasePoolManager.md b/docs/docs/api/appkit/Interface.LakebasePoolManager.md new file mode 100644 index 000000000..8f251436c --- /dev/null +++ b/docs/docs/api/appkit/Interface.LakebasePoolManager.md @@ -0,0 +1,93 @@ +# Interface: LakebasePoolManager + +Manages multiple Lakebase connection pools keyed by an identifier (e.g. userId). + +Used for On-Behalf-Of (OBO) scenarios where each user needs their own pool +with their own OAuth token refresh, enabling features like Row-Level Security. + +## Properties + +### size + +```ts +readonly size: number; +``` + +Number of active pools. + +## Methods + +### closeAll() + +```ts +closeAll(): Promise; +``` + +Close all managed pools and stop cleanup (for graceful shutdown). + +#### Returns + +`Promise`\<`void`\> + +*** + +### closePool() + +```ts +closePool(key: string): Promise; +``` + +Close and remove a specific pool. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `key` | `string` | + +#### Returns + +`Promise`\<`void`\> + +*** + +### getPool() + +```ts +getPool(key: string, perPoolConfig: Partial): Pool; +``` + +Get an existing pool or create a new one for the given key. +When creating, merges `perPoolConfig` with the base config passed to the factory. +On subsequent calls with the same key, `perPoolConfig` is ignored and the cached pool is returned. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `key` | `string` | +| `perPoolConfig` | `Partial`\<[`LakebasePoolConfig`](Interface.LakebasePoolConfig.md)\> | + +#### Returns + +`Pool` + +*** + +### hasPool() + +```ts +hasPool(key: string): boolean; +``` + +Check whether a pool exists for the given key. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `key` | `string` | + +#### Returns + +`boolean` diff --git a/docs/docs/api/appkit/index.md b/docs/docs/api/appkit/index.md index 5a21e935f..8a4880250 100644 --- a/docs/docs/api/appkit/index.md +++ b/docs/docs/api/appkit/index.md @@ -44,6 +44,7 @@ plugin architecture, and React integration. | [JobConfig](Interface.JobConfig.md) | Per-job configuration options. | | [JobsConnectorConfig](Interface.JobsConnectorConfig.md) | - | | [LakebasePoolConfig](Interface.LakebasePoolConfig.md) | Configuration for creating a Lakebase connection pool | +| [LakebasePoolManager](Interface.LakebasePoolManager.md) | Manages multiple Lakebase connection pools keyed by an identifier (e.g. userId). | | [PluginManifest](Interface.PluginManifest.md) | Plugin manifest that declares metadata and resource requirements. Attached to plugin classes as a static property. Extends the shared PluginManifest with strict resource types. | | [RequestedClaims](Interface.RequestedClaims.md) | Optional claims for fine-grained Unity Catalog table permissions When specified, the returned token will be scoped to only the requested tables | | [RequestedResource](Interface.RequestedResource.md) | Resource to request permissions for in Unity Catalog | @@ -88,6 +89,7 @@ plugin architecture, and React integration. | [appKitTypesPlugin](Function.appKitTypesPlugin.md) | Vite plugin to generate types for AppKit queries. Calls generateFromEntryPoint under the hood. | | [createApp](Function.createApp.md) | Bootstraps AppKit with the provided configuration. | | [createLakebasePool](Function.createLakebasePool.md) | Create a Lakebase pool with appkit's logger integration. Telemetry automatically uses appkit's OpenTelemetry configuration via global registry. | +| [createLakebasePoolManager](Function.createLakebasePoolManager.md) | Create a pool manager that maintains per-key Lakebase connection pools. | | [extractServingEndpoints](Function.extractServingEndpoints.md) | Extract serving endpoint config from a server file by AST-parsing it. Looks for `serving({ endpoints: { alias: { env: "..." }, ... } })` calls and extracts the endpoint alias names and their environment variable mappings. | | [findServerFile](Function.findServerFile.md) | Find the server entry file by checking candidate paths in order. | | [generateDatabaseCredential](Function.generateDatabaseCredential.md) | Generate OAuth credentials for Postgres database connection using the proper Postgres API. | diff --git a/docs/docs/api/appkit/typedoc-sidebar.ts b/docs/docs/api/appkit/typedoc-sidebar.ts index 162c3e68b..3408cc11f 100644 --- a/docs/docs/api/appkit/typedoc-sidebar.ts +++ b/docs/docs/api/appkit/typedoc-sidebar.ts @@ -152,6 +152,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Interface.LakebasePoolConfig", label: "LakebasePoolConfig" }, + { + type: "doc", + id: "api/appkit/Interface.LakebasePoolManager", + label: "LakebasePoolManager" + }, { type: "doc", id: "api/appkit/Interface.PluginManifest", @@ -315,6 +320,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Function.createLakebasePool", label: "createLakebasePool" }, + { + type: "doc", + id: "api/appkit/Function.createLakebasePoolManager", + label: "createLakebasePoolManager" + }, { type: "doc", id: "api/appkit/Function.extractServingEndpoints", diff --git a/docs/docs/plugins/execution-context.md b/docs/docs/plugins/execution-context.md index 98d2815bd..bb2400492 100644 --- a/docs/docs/plugins/execution-context.md +++ b/docs/docs/plugins/execution-context.md @@ -54,6 +54,12 @@ The `plugin.execute` span created by the execution interceptor chain includes th These attributes are automatically added when your plugin uses `execute()` or `executeStream()`. All built-in plugins use these methods for their OBO operations. Custom plugins should do the same to get automatic telemetry instrumentation. +## Lakebase per-user connections + +The Lakebase plugin uses a different mechanism for `asUser(req)`: instead of swapping the `WorkspaceClient` via AsyncLocalStorage, it creates a **separate `pg.Pool` per user**, each with its own OAuth token refresh. This is necessary because PostgreSQL connections are authenticated at connection time — the pool itself is the authentication boundary. + +See [Lakebase plugin — per-user connections](./lakebase.md#on-behalf-of-obo--per-user-connections) for details. + ## Development mode behavior In local development (`NODE_ENV=development`), if `asUser(req)` is called without a user token, it logs a warning and skips user impersonation — the operation runs with the default credentials configured for the app instead. The telemetry span will show `execution.context: "service"` with `execution.obo_dev_fallback: true` to distinguish these from regular service principal calls. diff --git a/docs/docs/plugins/lakebase.md b/docs/docs/plugins/lakebase.md index 768da3c2f..24d1b6a3d 100644 --- a/docs/docs/plugins/lakebase.md +++ b/docs/docs/plugins/lakebase.md @@ -113,6 +113,88 @@ await createApp({ }); ``` +## On-Behalf-Of (OBO) — per-user connections + +When your app needs Row-Level Security (RLS) or per-user data isolation, use `asUser(req)` to execute queries using a per-user Lakebase connection pool. Each user's pool is authenticated with their Databricks identity, so PostgreSQL's `current_user` reflects the actual user. + +### Prerequisites + +1. **Enable user authorization** in your Databricks App with the **`postgres`** scope. See [User authorization](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/auth#user-authorization) for setup instructions. In your `databricks.yml`: + ```yaml + resources: + apps: + app: + user_api_scopes: + - postgres + ``` + Apps scaffolded with `databricks apps init` and the Lakebase plugin include this automatically. + +2. Each app user needs a **Postgres role** in Lakebase. Create one with the Databricks CLI: + + ```bash + databricks postgres create-role "projects/{project_id}/branches/{branch_id}" \ + --json '{"spec": {"identity_type": "USER", "postgres_role": "user@example.com"}}' + ``` + + Alternatively, create roles in the Lakebase UI under **Branch Overview** → **Add role**. + + :::note + Do not grant `databricks_superuser` to OBO users — superusers bypass RLS. Use [fine-grained grants](#fine-grained-permissions) instead. + ::: + +### Usage + +No configuration needed — just call `asUser(req)`: + +```ts +const AppKit = await createApp({ + plugins: [server(), lakebase()], +}); + +// Service principal query (default — bypasses RLS as table owner) +const all = await AppKit.lakebase.query("SELECT * FROM app.orders"); + +// User-scoped query (per-user pool, RLS enforced) +app.get("/api/my-orders", async (req, res) => { + const result = await AppKit.lakebase + .asUser(req) + .query("SELECT * FROM app.orders ORDER BY created_at DESC"); + res.json(result.rows); +}); +``` + +When `asUser(req)` is called: +1. The user's token and identity are extracted from `x-forwarded-access-token` and `x-forwarded-email` headers (set automatically by Databricks Apps). +2. A per-user `pg.Pool` is created (or reused) with the user's OAuth credentials. +3. `query()` and `pool` use the user's pool — `current_user` in PostgreSQL reflects the user's identity. + +### Row-Level Security example + +```sql +-- As the service principal (during app setup): +ALTER TABLE app.orders ENABLE ROW LEVEL SECURITY; + +CREATE POLICY user_orders ON app.orders + FOR ALL TO PUBLIC + USING (owner = current_user); + +-- Grant access so OBO users can query +GRANT USAGE ON SCHEMA app TO PUBLIC; +GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA app TO PUBLIC; +``` + +### How it works + +- The **service principal pool** (`AppKit.lakebase.pool`) is always created and used for DDL operations, seeding, and admin queries. +- **Per-user pools** are created on the first `asUser(req)` call and cached by user identity. Each pool has its own OAuth token refresh cycle. +- Idle connections within per-user pools close automatically (30s idle timeout). Empty pool objects are cleaned up periodically. +- On shutdown, all pools (SP + user) are closed gracefully. +- In development mode (`NODE_ENV=development`), if no user token is available, `asUser(req)` falls back to the SP pool with a warning. + +:::caution[RLS and superusers] +PostgreSQL superusers bypass Row-Level Security entirely. Users with the `databricks_superuser` role will see all rows regardless of RLS policies. For RLS enforcement, use [fine-grained grants](#fine-grained-permissions) instead of the superuser role. +::: + ## Database Permissions When you create the app with the Lakebase resource using the [Getting started](#getting-started-with-the-lakebase) guide, the Service Principal is automatically granted `CONNECT_AND_CREATE` permission on the `postgres` resource. This lets the Service Principal connect to the database and create new objects, but **not access any existing schemas or tables.** @@ -123,14 +205,28 @@ To develop locally against a deployed Lakebase database: 1. **Deploy the app first.** The Service Principal creates the database schema and tables on first deploy. Apps generated from `databricks apps init` handle this automatically - they check if tables exist on startup and skip creation if they do. -2. **Grant `databricks_superuser` via the Lakebase UI:** - 1. Open the Lakebase Autoscaling UI and navigate to your project's **Branch Overview** page. - 2. Click **Add role** (or **Edit role** if your OAuth role already exists). - 3. Select your Databricks identity as the principal and check the **`databricks_superuser`** system role. +2. **Grant `databricks_superuser`** (skip if you are the Lakebase project owner — you already have full access): + + ```bash + # Create a new role with databricks_superuser + databricks postgres create-role "projects/{project_id}/branches/{branch_id}" \ + --json '{"spec": {"identity_type": "USER", "postgres_role": "user@example.com", "membership_roles": ["DATABRICKS_SUPERUSER"]}}' + ``` + + To grant superuser to an existing role, use [`update-role`](https://docs.databricks.com/aws/en/dev-tools/cli/reference/postgres-commands#databricks-postgres-update-role): + + ```bash + databricks postgres update-role \ + "projects/{project_id}/branches/{branch_id}/roles/{role_id}" \ + "spec.membership_roles" \ + --json '{"spec": {"membership_roles": ["DATABRICKS_SUPERUSER"]}}' + ``` + + Alternatively, you can manage roles in the Lakebase Autoscaling UI under your project's **Branch Overview** page → **Add role** / **Edit role**. 3. **Run locally** - your Databricks user identity (email) is used for OAuth authentication. The `databricks_superuser` role gives full **DML access** (read/write data) but **not DDL** (creating schemas or tables) - that's why deploying first matters (see note below). -For other users, use the same **Add role** flow in the Lakebase UI to create an OAuth role with `databricks_superuser` for each user. +For other users, repeat step 2 to create an OAuth role with `databricks_superuser` for each user. :::tip [Postgres password authentication](https://docs.databricks.com/aws/en/oltp/projects/authentication#overview) is a simpler alternative that avoids OAuth role permission complexity. However, it requires you to set up a password for the user in the **Branch Overview** page in the Lakebase Autoscaling UI. diff --git a/packages/appkit/src/connectors/lakebase/index.ts b/packages/appkit/src/connectors/lakebase/index.ts index c58b7a8cb..e2798a21e 100644 --- a/packages/appkit/src/connectors/lakebase/index.ts +++ b/packages/appkit/src/connectors/lakebase/index.ts @@ -35,3 +35,8 @@ export { RequestedClaimsPermissionSet, type RequestedResource, } from "@databricks/lakebase"; + +export { + createLakebasePoolManager, + type LakebasePoolManager, +} from "./pool-manager"; diff --git a/packages/appkit/src/connectors/lakebase/pool-manager.ts b/packages/appkit/src/connectors/lakebase/pool-manager.ts new file mode 100644 index 000000000..e21ed51d3 --- /dev/null +++ b/packages/appkit/src/connectors/lakebase/pool-manager.ts @@ -0,0 +1,107 @@ +import type { LakebasePoolConfig } from "@databricks/lakebase"; +import type { Pool } from "pg"; +import { createLakebasePool } from "./index"; + +/** Interval for removing empty (connectionless) pools from the Map. */ +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + +/** + * Manages multiple Lakebase connection pools keyed by an identifier (e.g. userId). + * + * Used for On-Behalf-Of (OBO) scenarios where each user needs their own pool + * with their own OAuth token refresh, enabling features like Row-Level Security. + */ +export interface LakebasePoolManager { + /** + * Get an existing pool or create a new one for the given key. + * When creating, merges `perPoolConfig` with the base config passed to the factory. + * On subsequent calls with the same key, `perPoolConfig` is ignored and the cached pool is returned. + */ + getPool(key: string, perPoolConfig: Partial): Pool; + + /** Check whether a pool exists for the given key. */ + hasPool(key: string): boolean; + + /** Close and remove a specific pool. */ + closePool(key: string): Promise; + + /** Close all managed pools and stop cleanup (for graceful shutdown). */ + closeAll(): Promise; + + /** Number of active pools. */ + readonly size: number; +} + +/** + * Create a pool manager that maintains per-key Lakebase connection pools. + * + * Each pool is created via `createLakebasePool` with the base config merged + * with per-pool overrides (e.g. a user's `workspaceClient` and `user`). + * + * A periodic cleanup removes empty Pool objects (where all connections have + * been closed by pg's built-in `idleTimeoutMillis`) from the internal Map. + * + * @example OBO usage + * ```typescript + * const poolManager = createLakebasePoolManager(); + * + * // In a route handler: + * const userPool = poolManager.getPool(userName, { + * workspaceClient: new WorkspaceClient({ token: userToken, host, authType: "pat" }), + * user: userName, + * }); + * const result = await userPool.query("SELECT * FROM products"); + * ``` + */ +export function createLakebasePoolManager( + baseConfig?: Partial, +): LakebasePoolManager { + const pools = new Map(); + + // Periodically remove empty Pool objects from the Map. + // pg.Pool's idleTimeoutMillis closes idle connections automatically; + // this just cleans up the Map entries once all connections are gone. + const cleanupTimer = setInterval(() => { + for (const [key, pool] of pools) { + if (pool.totalCount === 0) { + pool.end().catch(() => {}); + pools.delete(key); + } + } + }, CLEANUP_INTERVAL_MS); + cleanupTimer.unref(); + + return { + getPool(key: string, perPoolConfig: Partial): Pool { + const existing = pools.get(key); + if (existing) return existing; + + const pool = createLakebasePool({ ...baseConfig, ...perPoolConfig }); + pools.set(key, pool); + return pool; + }, + + hasPool(key: string): boolean { + return pools.has(key); + }, + + async closePool(key: string): Promise { + const pool = pools.get(key); + if (pool) { + await pool.end(); + pools.delete(key); + } + }, + + async closeAll(): Promise { + clearInterval(cleanupTimer); + const endPromises = [...pools.values()].map((p) => p.end()); + await Promise.all(endPromises); + pools.clear(); + }, + + get size() { + return pools.size; + }, + }; +} diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index 00fd6ff86..ac746bd25 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -20,12 +20,14 @@ export type { DatabaseCredential, GenerateDatabaseCredentialRequest, LakebasePoolConfig, + LakebasePoolManager, RequestedClaims, RequestedResource, } from "./connectors/lakebase"; // Lakebase Autoscaling connector export { createLakebasePool, + createLakebasePoolManager, generateDatabaseCredential, getLakebaseOrmConfig, getLakebasePgConfig, diff --git a/packages/appkit/src/plugins/lakebase/lakebase.ts b/packages/appkit/src/plugins/lakebase/lakebase.ts index 36c64c5ae..397539099 100644 --- a/packages/appkit/src/plugins/lakebase/lakebase.ts +++ b/packages/appkit/src/plugins/lakebase/lakebase.ts @@ -1,19 +1,24 @@ +import { WorkspaceClient } from "@databricks/sdk-experimental"; +import type express from "express"; import type { Pool, QueryResult, QueryResultRow } from "pg"; import type { AgentToolDefinition, ToolProvider } from "shared"; import { z } from "zod"; import { createLakebasePool, + createLakebasePoolManager, getLakebaseOrmConfig, getLakebasePgConfig, getUsernameWithApiLookup, + type LakebasePoolManager, } from "../../connectors/lakebase"; import { buildToolkitEntries } from "../../core/agent/build-toolkit"; import { defineTool, - executeFromRegistry, toolsFromRegistry, } from "../../core/agent/tools/define-tool"; import { assertReadOnlySql } from "../../core/agent/tools/sql-policy"; +import { formatZodError } from "../../core/agent/tools/tool"; +import { AuthenticationError } from "../../errors"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; import type { PluginManifest } from "../../registry"; @@ -22,12 +27,23 @@ import type { ILakebaseConfig } from "./types"; const logger = createLogger("lakebase"); +/** Default pool settings for per-user OBO pools. */ +const OBO_POOL_DEFAULTS = { + max: 3, + allowExitOnIdle: true, + idleTimeoutMillis: 30_000, +}; + /** * AppKit plugin for Databricks Lakebase Autoscaling. * * Wraps `@databricks/lakebase` to provide a standard `pg.Pool` with automatic * OAuth token refresh, integrated with AppKit's logger and OpenTelemetry setup. * + * Supports On-Behalf-Of (OBO) via `asUser(req)` — each user gets a separate + * `pg.Pool` authenticated with their Databricks identity, enabling features + * like Row-Level Security (RLS). + * * @example * ```ts * import { createApp, lakebase, server } from "@databricks/appkit"; @@ -36,7 +52,11 @@ const logger = createLogger("lakebase"); * plugins: [server(), lakebase()], * }); * + * // Service principal query * const result = await AppKit.lakebase.query("SELECT * FROM users WHERE id = $1", [userId]); + * + * // User-scoped query (per-user pool, RLS enforced) + * const mine = await AppKit.lakebase.asUser(req).query("SELECT * FROM my_data"); * ``` */ export class LakebasePlugin extends Plugin implements ToolProvider { @@ -45,9 +65,10 @@ export class LakebasePlugin extends Plugin implements ToolProvider { protected declare config: ILakebaseConfig; private pool: Pool | null = null; + private oboPoolManager: LakebasePoolManager | null = null; /** - * Initializes the Lakebase connection pool. + * Initializes the Lakebase connection pool and OBO pool manager. * Called automatically by AppKit during the plugin setup phase. * * Resolves the PostgreSQL username via {@link getUsernameWithApiLookup}, @@ -58,6 +79,12 @@ export class LakebasePlugin extends Plugin implements ToolProvider { const user = await getUsernameWithApiLookup(poolConfig); this.pool = createLakebasePool({ ...poolConfig, user }); logger.info("Lakebase pool initialized"); + + this.oboPoolManager = createLakebasePoolManager({ + ...poolConfig, + ...OBO_POOL_DEFAULTS, + }); + logger.info("Lakebase OBO pool manager initialized"); } /** @@ -117,27 +144,156 @@ export class LakebasePlugin extends Plugin implements ToolProvider { } /** - * Gracefully drains and closes the connection pool. + * Returns a user-scoped version of the plugin that uses a per-user + * connection pool authenticated with the user's Databricks identity. + * + * Overrides the base `Plugin.asUser()` because Lakebase needs entirely + * separate `pg.Pool` instances per user (each with their own OAuth token + * refresh), rather than the standard AsyncLocalStorage context swap. + * + * @param req - Express request containing `x-forwarded-access-token` and `x-forwarded-email` headers + * @returns A proxied plugin instance where `query()` and `pool` use the user's pool + */ + asUser(req: express.Request): this { + const token = req.header("x-forwarded-access-token"); + const userEmail = req.header("x-forwarded-email"); + const isDev = process.env.NODE_ENV === "development"; + + // In dev mode without token, delegate to the base class dev fallback + // which uses the SP pool with DEV_OBO_FALLBACK_KEY in OTel context + if (!token && isDev) { + return super.asUser(req); + } + + if (!token) { + throw AuthenticationError.missingToken("user token"); + } + + if (!userEmail && !isDev) { + throw AuthenticationError.missingToken( + "x-forwarded-email header (required for Lakebase per-user pools)", + ); + } + + // Lakebase OAuth roles use email as postgres_role + const effectiveUser = userEmail || "local-dev-user"; + + // Get or create a per-user pool + // biome-ignore lint/style/noNonNullAssertion: oboPoolManager is guaranteed non-null after setup() + const isNew = !this.oboPoolManager!.hasPool(effectiveUser); + // biome-ignore lint/style/noNonNullAssertion: oboPoolManager is guaranteed non-null after setup() + const userPool = this.oboPoolManager!.getPool(effectiveUser, { + workspaceClient: new WorkspaceClient({ + token, + host: process.env.DATABRICKS_HOST, + authType: "pat", + }), + user: effectiveUser, + }); + + if (isNew) { + logger.info( + 'Created OBO pool for user "%s" (total: %d)', + effectiveUser, + // biome-ignore lint/style/noNonNullAssertion: oboPoolManager is guaranteed non-null after setup() + this.oboPoolManager!.size, + ); + } + + const pluginConfig = this.config; + + // Return a proxy that intercepts pool-related methods and exports. + // The `pool` intercept is critical for agent tools: executeAgentTool + // calls this.runReadOnlyStatement() which accesses this.pool.connect(). + // When `this` is the proxy, the get trap routes to the user pool. + return new Proxy(this, { + get(target, prop, receiver) { + if (prop === "pool") { + return userPool; + } + + if (prop === "query") { + return ( + text: string, + values?: unknown[], + ): Promise> => userPool.query(text, values); + } + + if (prop === "exports") { + return () => ({ + pool: userPool, + query: ( + text: string, + values?: unknown[], + ) => userPool.query(text, values), + getOrmConfig: () => + getLakebaseOrmConfig({ + ...pluginConfig.pool, + workspaceClient: new WorkspaceClient({ + token, + host: process.env.DATABRICKS_HOST, + authType: "pat", + }), + user: effectiveUser, + }), + getPgConfig: () => + getLakebasePgConfig({ + ...pluginConfig.pool, + workspaceClient: new WorkspaceClient({ + token, + host: process.env.DATABRICKS_HOST, + authType: "pat", + }), + user: effectiveUser, + }), + }); + } + + if (prop === "asUser") { + return () => { + throw new Error("asUser() cannot be chained"); + }; + } + + return Reflect.get(target, prop, receiver); + }, + }) as this; + } + + /** + * Gracefully drains and closes all connection pools (SP + OBO). * Called automatically by AppKit during shutdown. */ abortActiveOperations(): void { super.abortActiveOperations(); if (this.pool) { - logger.info("Closing Lakebase pool"); + logger.info("Closing Lakebase SP pool"); this.pool.end().catch((err) => { - logger.error("Error closing Lakebase pool: %O", err); + logger.error("Error closing Lakebase SP pool: %O", err); }); this.pool = null; } + if (this.oboPoolManager) { + logger.info( + "Closing all Lakebase OBO pools (%d)", + this.oboPoolManager.size, + ); + this.oboPoolManager.closeAll().catch((err) => { + logger.error("Error closing Lakebase OBO pools: %O", err); + }); + this.oboPoolManager = null; + } } /** * Returns the plugin's public API, accessible via `AppKit.lakebase`. * - * - `pool` — The raw `pg.Pool` instance, for use with ORMs or advanced scenarios + * - `pool` — The raw `pg.Pool` instance (service principal), for use with ORMs or advanced scenarios * - `query` — Convenience method for executing parameterized SQL queries * - `getOrmConfig()` — Returns a config object compatible with Drizzle, TypeORM, Sequelize, etc. * - `getPgConfig()` — Returns a `pg.PoolConfig` object for manual pool construction + * + * Use `AppKit.lakebase.asUser(req)` to get the same API backed by a per-user pool. */ /** @@ -165,8 +321,8 @@ export class LakebasePlugin extends Plugin implements ToolProvider { const readOnly = opt.readOnly !== false; return defineTool({ description: readOnly - ? "Execute a read-only SQL query against the Lakebase PostgreSQL database. Only SELECT, WITH, SHOW, EXPLAIN, and DESCRIBE statements are accepted. Use $1, $2, etc. as placeholders and pass values separately. Runs as the application's service principal." - : "Execute a parameterized SQL statement against the Lakebase PostgreSQL database. Use $1, $2, etc. as placeholders and pass values separately. Runs as the application's service principal. This tool can modify data; every invocation requires explicit human approval.", + ? "Execute a read-only SQL query against the Lakebase PostgreSQL database. Only SELECT, WITH, SHOW, EXPLAIN, and DESCRIBE statements are accepted. Use $1, $2, etc. as placeholders and pass values separately." + : "Execute a parameterized SQL statement against the Lakebase PostgreSQL database. Use $1, $2, etc. as placeholders and pass values separately. This tool can modify data; every invocation requires explicit human approval.", schema: z.object({ text: z .string() @@ -182,6 +338,7 @@ export class LakebasePlugin extends Plugin implements ToolProvider { readOnly, destructive: !readOnly, idempotent: false, + requiresUserContext: true, }, handler: async (args) => { if (readOnly) { @@ -198,12 +355,37 @@ export class LakebasePlugin extends Plugin implements ToolProvider { return toolsFromRegistry(this.tools); } + /** + * Executes a registered agent tool by name. + * + * This method intentionally inlines the tool dispatch instead of + * delegating to `executeFromRegistry`, so that `this.query()` and + * `this.runReadOnlyStatement()` resolve through the Proxy returned + * by `asUser(req)`. The Proxy intercepts `query` and `pool` to + * route SQL to the per-user connection pool, enabling OBO execution + * for agent tools. + */ async executeAgentTool( name: string, args: unknown, - signal?: AbortSignal, + _signal?: AbortSignal, ): Promise { - return executeFromRegistry(this.tools, name, args, signal); + const entry = this.tools[name]; + if (!entry) { + throw new Error(`Unknown tool: ${name}`); + } + const parsed = entry.schema.safeParse(args); + if (!parsed.success) { + return formatZodError(parsed.error, name); + } + + const { text, values } = parsed.data; + if (entry.annotations?.readOnly) { + assertReadOnlySql(text); + return this.runReadOnlyStatement(text, values); + } + const result = await this.query(text, values); + return result.rows; } toolkit(opts?: import("../../core/agent/types").ToolkitOptions) { diff --git a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts index 855ce1012..a7b77730a 100644 --- a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts +++ b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts @@ -48,6 +48,12 @@ vi.mock("../../../connectors/lakebase", () => ({ }), end: vi.fn(), })), + createLakebasePoolManager: vi.fn(() => ({ + getPool: vi.fn(), + hasPool: vi.fn(() => false), + closeAll: vi.fn(async () => {}), + size: 0, + })), getLakebaseOrmConfig: vi.fn(() => ({})), getLakebasePgConfig: vi.fn(() => ({})), getUsernameWithApiLookup: vi.fn(async () => "test-user"), @@ -84,6 +90,7 @@ describe("LakebasePlugin — agent tool opt-in", () => { readOnly: true, destructive: false, idempotent: false, + requiresUserContext: true, }); }); @@ -96,6 +103,7 @@ describe("LakebasePlugin — agent tool opt-in", () => { readOnly: false, destructive: true, idempotent: false, + requiresUserContext: true, }); }); }); @@ -224,3 +232,109 @@ describe("LakebasePlugin — destructive mode", () => { ); }); }); + +describe("LakebasePlugin — OBO agent tool execution", () => { + // Tracks calls made to the user pool (vs SP pool) so we can assert + // that the proxy correctly routes agent tool SQL to the OBO pool. + const userPoolQueries: Array<{ text: string; values?: unknown[] }> = []; + const userClientQueries: Array<{ text: string; values?: unknown[] }> = []; + const userClientReleases: number[] = []; + + const fakeReq = { + header: (name: string) => { + const headers: Record = { + "x-forwarded-access-token": "user-token-123", + "x-forwarded-email": "alice@example.com", + }; + return headers[name]; + }, + } as unknown as import("express").Request; + + function makeUserPool() { + return { + query: vi.fn(async (text: string, values?: unknown[]) => { + userPoolQueries.push({ text, values }); + return { rows: [{ from: "user-pool" }] }; + }), + connect: vi.fn(async () => ({ + query: vi.fn(async (text: string, values?: unknown[]) => { + userClientQueries.push({ text, values }); + return { rows: [{ from: "user-pool-client" }] }; + }), + release: vi.fn(() => { + userClientReleases.push(userClientReleases.length + 1); + }), + })), + end: vi.fn(), + }; + } + + beforeEach(async () => { + userPoolQueries.length = 0; + userClientQueries.length = 0; + userClientReleases.length = 0; + clientQueries.length = 0; + + const { createLakebasePoolManager } = await import( + "../../../connectors/lakebase" + ); + vi.mocked(createLakebasePoolManager).mockReturnValue({ + getPool: vi.fn(() => makeUserPool() as unknown as Pool), + hasPool: vi.fn(() => false), + closeAll: vi.fn(async () => {}), + get size() { + return 1; + }, + }); + }); + + test("read-only query via OBO uses user pool, not SP pool", async () => { + const plugin = makePlugin({ exposeAsAgentTool: {} }); + await plugin.setup(); + + const result = await plugin + .asUser(fakeReq) + .executeAgentTool("query", { text: "SELECT 1" }); + + expect(result).toEqual([{ from: "user-pool-client" }]); + expect(userClientQueries.map((c) => c.text)).toEqual([ + "BEGIN READ ONLY", + "SELECT 1", + "ROLLBACK", + ]); + // SP pool should NOT have been touched + expect(clientQueries).toHaveLength(0); + }); + + test("destructive query via OBO uses user pool", async () => { + const plugin = makePlugin({ exposeAsAgentTool: { readOnly: false } }); + await plugin.setup(); + + const result = await plugin.asUser(fakeReq).executeAgentTool("query", { + text: "UPDATE t SET x=1", + values: [42], + }); + + expect(result).toEqual([{ from: "user-pool" }]); + expect(userPoolQueries).toEqual([ + { text: "UPDATE t SET x=1", values: [42] }, + ]); + // SP pool should NOT have been touched + expect(clientQueries).toHaveLength(0); + }); + + test("read-only policy still enforced via OBO", async () => { + const plugin = makePlugin({ exposeAsAgentTool: {} }); + await plugin.setup(); + + await expect( + plugin.asUser(fakeReq).executeAgentTool("query", { + text: "DROP TABLE users", + }), + ).rejects.toThrow(/read-only policy violation/i); + + // No pool should have been touched + expect(userClientQueries).toHaveLength(0); + expect(clientQueries).toHaveLength(0); + }); +}); diff --git a/template/client/src/pages/lakebase/LakebasePage.tsx b/template/client/src/pages/lakebase/LakebasePage.tsx index 06c02922d..36c180c94 100644 --- a/template/client/src/pages/lakebase/LakebasePage.tsx +++ b/template/client/src/pages/lakebase/LakebasePage.tsx @@ -12,9 +12,10 @@ import { useState, useEffect } from 'react'; import { Check, X } from 'lucide-react'; interface Todo { - id: number; + id: string; title: string; completed: boolean; + created_by: string | null; created_at: string; } @@ -27,8 +28,11 @@ export function LakebasePage() { useEffect(() => { fetch('/api/lakebase/todos') - .then((res) => { - if (!res.ok) throw new Error(`Failed to fetch todos: ${res.statusText}`); + .then(async (res) => { + if (!res.ok) { + const body = await res.json().catch(() => null); + throw new Error(body?.error ?? `Failed to fetch todos: ${res.statusText}`); + } return res.json() as Promise; }) .then(setTodos) @@ -48,7 +52,10 @@ export function LakebasePage() { headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ title }), }); - if (!res.ok) throw new Error(`Failed to create todo: ${res.statusText}`); + if (!res.ok) { + const body = await res.json().catch(() => null); + throw new Error(body?.error ?? `Failed to create todo: ${res.statusText}`); + } const created = (await res.json()) as Todo; setTodos((prev) => [created, ...prev]); setNewTitle(''); @@ -59,10 +66,13 @@ export function LakebasePage() { } }; - const toggleTodo = async (id: number) => { + const toggleTodo = async (id: string) => { try { const res = await fetch(`/api/lakebase/todos/${id}`, { method: 'PATCH' }); - if (!res.ok) throw new Error(`Failed to update todo: ${res.statusText}`); + if (!res.ok) { + const body = await res.json().catch(() => null); + throw new Error(body?.error ?? `Failed to update todo: ${res.statusText}`); + } const updated = (await res.json()) as Todo; setTodos((prev) => prev.map((t) => (t.id === id ? updated : t))); } catch (err) { @@ -70,10 +80,13 @@ export function LakebasePage() { } }; - const deleteTodo = async (id: number) => { + const deleteTodo = async (id: string) => { try { const res = await fetch(`/api/lakebase/todos/${id}`, { method: 'DELETE' }); - if (!res.ok) throw new Error(`Failed to delete todo: ${res.statusText}`); + if (!res.ok) { + const body = await res.json().catch(() => null); + throw new Error(body?.error ?? `Failed to delete todo: ${res.statusText}`); + } setTodos((prev) => prev.filter((t) => t.id !== id)); } catch (err) { setError(err instanceof Error ? err.message : 'Failed to delete todo'); @@ -149,9 +162,16 @@ export function LakebasePage() { {todo.completed && } - - {todo.title} - +
+ + {todo.title} + + {todo.created_by && ( + + by {todo.created_by} + + )} +