diff --git a/packages/appkit/src/database/index.ts b/packages/appkit/src/database/index.ts index 55d54b94..65705ed1 100644 --- a/packages/appkit/src/database/index.ts +++ b/packages/appkit/src/database/index.ts @@ -1 +1,10 @@ +export type { + CountOptions, + DataPath, + IncludeSpec, + OrderSpec, + SelectOptions, + WhereSpec, +} from "./runtime"; +export { createDrizzleDataPath, createUserScopedDataPath } from "./runtime"; export * from "./schema-builder"; diff --git a/packages/appkit/src/database/runtime/data-path.ts b/packages/appkit/src/database/runtime/data-path.ts new file mode 100644 index 00000000..cabb1666 --- /dev/null +++ b/packages/appkit/src/database/runtime/data-path.ts @@ -0,0 +1,175 @@ +import type { AppKitTable } from "../schema-builder/types"; + +/** Generic row shape returned by every read terminator. */ +export type Row = Record; + +/** Sort direction accepted by `OrderSpec`. */ +type Direction = "asc" | "desc"; + +/** + * Operator vocabulary supported by `WhereSpec`. Names match Drizzle helpers + * one-for-one; the runtime translates each into the matching helper (`eq`, + * `ne`, `gt`, …) when the query is built. + */ +type WhereOperator = + | "eq" + | "neq" + | "gt" + | "gte" + | "lt" + | "lte" + | "like" + | "ilike" + | "in" + | "is"; + +/** + * Per-column predicate. Bare value is shorthand for equality; an array is + * shorthand for `IN`; an object selects one or more operators. + */ +type WhereValue = + | string + | number + | boolean + | null + | readonly (string | number | boolean | null)[] + | { [K in WhereOperator]?: unknown }; + +/** Filter map: column name → predicate. */ +export type WhereSpec = Record; + +/** Order map: column name → direction (default: `asc`). */ +export type OrderSpec = Record; + +/** + * Per-relation include options. `select` projects related columns; `where` + * narrows them; `limit` and `order` paginate them. Nested includes are + * intentionally out of MVP scope. + */ +export interface IncludeOptions { + /** Restrict the related row's columns. Defaults to all columns. */ + select?: ReadonlyArray; + /** Cap the related rows fetched per parent. */ + limit?: number; + /** Order the related rows. */ + order?: OrderSpec; + /** Filter the related rows. */ + where?: WhereSpec; +} + +/** + * Eager-load shape: relation name → either `true` (all default) or an options + * bag. The runtime resolves relation names against the parent table's + * `$relations` metadata; unknown names throw at query time. + */ +export type IncludeSpec = Record; + +/** Options accepted by `DataPath.select`. */ +export interface SelectOptions { + where?: WhereSpec; + order?: OrderSpec; + limit?: number; + offset?: number; + /** Project specific columns. Defaults to `*`. */ + columns?: ReadonlyArray; + /** Eager-load related entities. */ + include?: IncludeSpec; + /** + * Reserved. `node-postgres` does not honor `AbortSignal` at the query level + * today — runaway queries are bounded server-side by Postgres + * `statement_timeout` (set by the plugin on every pool connection). The + * AppKit timeout interceptor still rejects the JS promise when fired. + */ + signal?: AbortSignal; +} + +/** Options accepted by `DataPath.findOne`. */ +export interface FindOneOptions { + columns?: ReadonlyArray; + include?: IncludeSpec; + signal?: AbortSignal; +} + +/** Options accepted by `DataPath.count`. */ +export interface CountOptions { + where?: WhereSpec; + signal?: AbortSignal; +} + +/** + * AppKit-shaped abstraction over the runtime data path. + * + * The entity proxy and route layer talk to this interface only. The + * implementation in `drizzle-runtime.ts` is the *only* AppKit file that + * imports `drizzle-orm` for query execution. Swapping Drizzle for Kysely, + * Knex, or raw SQL means rewriting one file. + * + * Identity, OBO, telemetry, hook dispatch, and validation all live above this + * interface — `DataPath` is plain "execute these reads/writes against this + * pool". Pool selection (SP vs per-user) happens in `entity-wiring.ts`. + */ +export interface DataPath { + /** Run a SELECT and return rows (with optional eager joins). */ + select(table: AppKitTable, opts: SelectOptions): Promise; + + /** Find one row by primary key, or `null` when no row matches. */ + findOne( + table: AppKitTable, + pkColumn: string, + id: string | number, + opts?: FindOneOptions, + ): Promise; + + /** Count rows matching `where`. */ + count(table: AppKitTable, opts: CountOptions): Promise; + + /** INSERT one row and return the inserted row (with server-generated columns). */ + insert(table: AppKitTable, data: Row, signal?: AbortSignal): Promise; + + /** + * UPDATE one row by primary key. Returns the updated row, or `null` when + * no row matches. Hook dispatch and Zod validation happen above this layer. + */ + update( + table: AppKitTable, + pkColumn: string, + id: string | number, + patch: Row, + signal?: AbortSignal, + ): Promise; + + /** + * INSERT … ON CONFLICT (`onConflict`) DO UPDATE. Returns the resulting row. + * `onConflict` is a column name in the table (single-column unique constraint). + */ + upsert( + table: AppKitTable, + data: Row, + options: { onConflict: string }, + signal?: AbortSignal, + ): Promise; + + /** DELETE one row by primary key. No-op when no row matches. */ + delete( + table: AppKitTable, + pkColumn: string, + id: string | number, + signal?: AbortSignal, + ): Promise; + + /** + * Run `fn` inside a database transaction. The nested `DataPath` shares the + * same surface; rollbacks happen on throw, commits on resolution. + */ + transaction(fn: (tx: DataPath) => Promise): Promise; + + /** + * Tagged-template SQL escape hatch. Values are bound as parameters; column + * and identifier interpolation is intentionally not supported here — use + * `getDrizzle()` from the plugin's exports for that case. + */ + raw( + strings: TemplateStringsArray, + ...values: unknown[] + ): Promise; +} diff --git a/packages/appkit/src/database/runtime/drizzle-runtime.ts b/packages/appkit/src/database/runtime/drizzle-runtime.ts new file mode 100644 index 00000000..53fba223 --- /dev/null +++ b/packages/appkit/src/database/runtime/drizzle-runtime.ts @@ -0,0 +1,695 @@ +import { + and, + asc, + count as countFn, + desc, + eq, + gt, + gte, + ilike, + inArray, + isNull, + like, + lt, + lte, + ne, + type SQL, + sql, +} from "drizzle-orm"; +import { drizzle, type NodePgDatabase } from "drizzle-orm/node-postgres"; +import type { Pool } from "pg"; +import { nonPrivateColumnNames } from "../schema-builder/private"; +import type { AppKitTable, Schema } from "../schema-builder/types"; +import type { + DataPath, + IncludeOptions, + IncludeSpec, + OrderSpec, + Row, + SelectOptions, + WhereSpec, +} from "./data-path"; + +// Drizzle column type lives in `pg-core` internals — keep opaque so the rest +// of the file stays free of Drizzle generics. +type DrizzleColumn = unknown; + +// Drizzle table — never narrowed; queries cast where Drizzle wants generics. +type DrizzleTable = Record; + +/** + * `manyToOne` — parent holds the FK (e.g. `user.team_id`). + * `oneToMany` — related holds the FK back (e.g. `post.author_id`). + */ +type ResolvedRelation = + | { + kind: "manyToOne"; + relatedTable: AppKitTable; + /** Column on the *parent* table holding the FK. */ + parentFk: string; + /** Column on the *related* table being referenced (usually `id`). */ + relatedKey: string; + } + | { + kind: "oneToMany"; + relatedTable: AppKitTable; + /** Column on the *related* table holding the FK back to the parent. */ + relatedFk: string; + /** Column on the *parent* table being referenced (usually `id`). */ + parentKey: string; + }; + +/** + * Build a `DataPath` backed by `drizzle-orm/node-postgres`. + * + * Sole `drizzle-orm` import site (decision #30) — swapping query builders + * means rewriting only this file. `schema` resolves eager-loading relations + * via a two-query pattern (parent + IN(ids)), avoiding N+1 without needing + * Drizzle's `relations()` API. + */ +export function createDrizzleDataPath(pool: Pool, schema: Schema): DataPath { + const db = drizzle(pool); + return makeDataPath(db, schema); +} + +/** + * User-scoped `DataPath`: each op runs in a txn with `SET LOCAL app.user_id`. + * + * The txn is the security boundary — the GUC is txn-scoped, so a connection + * returned to the pool can't leak identity to the next checkout. RLS policies + * reading `current_setting('app.user_id')` resolve to the OBO user. + * + * One SP pool services everyone (no per-user pools, OAuth refresh, or LRU). + * Cost: one BEGIN+COMMIT per op; amortize via `transaction(fn)` for multi-step. + */ +export function createUserScopedDataPath( + pool: Pool, + schema: Schema, + context: { userId: string }, +): DataPath { + const db = drizzle(pool); + + // Each op opens its own txn + GUC. `transaction(fn)` shares one + // BEGIN/SET LOCAL/COMMIT across nested ops. + async function withUserContext( + fn: (tx: DataPath) => Promise, + ): Promise { + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's transaction generic is opaque at this boundary. + return await (db as any).transaction(async (tx: NodePgDatabase) => { + // `set_config(_, _, true)` = parameterized SET LOCAL; cleared on COMMIT/ROLLBACK. + await tx.execute( + sql`SELECT set_config('app.user_id', ${context.userId}, true)`, + ); + return await fn(makeDataPath(tx, schema)); + }); + } + + const path: DataPath = { + async select(table, opts) { + return await withUserContext((tx) => tx.select(table, opts)); + }, + async findOne(table, pkColumn, id, opts) { + return await withUserContext((tx) => + tx.findOne(table, pkColumn, id, opts), + ); + }, + async count(table, opts) { + return await withUserContext((tx) => tx.count(table, opts)); + }, + async insert(table, data, signal) { + return await withUserContext((tx) => tx.insert(table, data, signal)); + }, + async update(table, pkColumn, id, patch, signal) { + return await withUserContext((tx) => + tx.update(table, pkColumn, id, patch, signal), + ); + }, + async upsert(table, data, options, signal) { + return await withUserContext((tx) => + tx.upsert(table, data, options, signal), + ); + }, + async delete(table, pkColumn, id, signal) { + return await withUserContext((tx) => + tx.delete(table, pkColumn, id, signal), + ); + }, + async transaction(fn) { + return await withUserContext(fn); + }, + async raw(strings, ...values) { + return await withUserContext((tx) => tx.raw(strings, ...values)); + }, + }; + return path; +} + +function makeDataPath( + db: NodePgDatabase | NodePgDatabase>, + schema: Schema, +): DataPath { + const path: DataPath = { + async select(table, opts) { + const rows = await runSelect(db, table, opts); + if (opts.include && rows.length > 0) { + await applyIncludes(db, schema, table, rows, opts.include); + } + return rows; + }, + + async findOne(table, pkColumn, id, opts) { + const pk = getColumn(table, pkColumn); + const drizzleTable = table.$drizzle as DrizzleTable; + const projection = projectColumns(table, opts?.columns); + + const builder = db + .select(projection as never) + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + .from(drizzleTable as any) + .where(eq(pk as never, id)); + + const rows = (await maybeAbort(builder, opts?.signal)) as Row[]; + const row = rows[0] ?? null; + + if (row && opts?.include) { + await applyIncludes(db, schema, table, [row], opts.include); + } + return row; + }, + + async count(table, opts) { + const drizzleTable = table.$drizzle as DrizzleTable; + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + const builder = db.select({ value: countFn() }).from(drizzleTable as any); + const where = buildWhere(table, opts.where); + const final = where ? builder.where(where) : builder; + const result = (await maybeAbort(final, opts.signal)) as Array<{ + value: number; + }>; + return Number(result[0]?.value ?? 0); + }, + + async insert(table, data, signal) { + const drizzleTable = table.$drizzle as DrizzleTable; + const returning = projectColumns(table, undefined); + const builder = db + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + .insert(drizzleTable as any) + .values(data as never) + .returning(returning as never); + const rows = (await maybeAbort(builder, signal)) as Row[]; + const row = rows[0]; + if (!row) { + throw new Error(`insert into ${table.name} did not return a row`); + } + return row; + }, + + async update(table, pkColumn, id, patch, signal) { + const drizzleTable = table.$drizzle as DrizzleTable; + const pk = getColumn(table, pkColumn); + const returning = projectColumns(table, undefined); + const builder = db + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + .update(drizzleTable as any) + .set(patch as never) + .where(eq(pk as never, id)) + .returning(returning as never); + const rows = (await maybeAbort(builder, signal)) as Row[]; + return rows[0] ?? null; + }, + + async upsert(table, data, options, signal) { + const drizzleTable = table.$drizzle as DrizzleTable; + const conflictCol = getColumn(table, options.onConflict); + const returning = projectColumns(table, undefined); + const builder = db + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + .insert(drizzleTable as any) + .values(data as never) + .onConflictDoUpdate({ + target: conflictCol as never, + set: data as never, + }) + .returning(returning as never); + const rows = (await maybeAbort(builder, signal)) as Row[]; + const row = rows[0]; + if (!row) { + throw new Error(`upsert on ${table.name} did not return a row`); + } + return row; + }, + + async delete(table, pkColumn, id, signal) { + const drizzleTable = table.$drizzle as DrizzleTable; + const pk = getColumn(table, pkColumn); + const builder = db + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + .delete(drizzleTable as any) + .where(eq(pk as never, id)); + await maybeAbort(builder, signal); + }, + + async transaction(fn) { + // Client-side cap on the whole callback. `statement_timeout` only bounds + // individual queries — a workflow holding the txn open between queries + // (e.g. awaiting an external API) wouldn't trip it. 30s catches stuck + // callers without surprising healthy ones. + const TRANSACTION_TIMEOUT_MS = 30_000; + // biome-ignore lint/suspicious/noExplicitAny: tx shares the NodePgDatabase shape. + const txnPromise = (db as any).transaction(async (tx: NodePgDatabase) => { + return await fn(makeDataPath(tx, schema)); + }) as Promise; + + let timer: NodeJS.Timeout | undefined; + const timeout = new Promise((_, reject) => { + timer = setTimeout( + () => + reject( + new Error( + `transaction(fn) exceeded ${TRANSACTION_TIMEOUT_MS}ms client-side cap`, + ), + ), + TRANSACTION_TIMEOUT_MS, + ); + timer.unref?.(); + }); + try { + return (await Promise.race([txnPromise, timeout])) as never; + } finally { + if (timer) clearTimeout(timer); + } + }, + + async raw(strings, ...values) { + // Hand off to Drizzle's `sql` tag — it parameterizes `values`, no string + // concat happens here. + const query = sql( + Object.assign([...strings], { + raw: [...strings], + }) as TemplateStringsArray, + ...values, + ); + // biome-ignore lint/suspicious/noExplicitAny: execute() return shape varies by driver; rows is always present for node-postgres. + const result = (await db.execute(query)) as { rows: unknown[] }; + return result.rows as never[]; + }, + }; + return path; +} + +/* -------------------------------------------------------------------------- * + * SELECT * + * -------------------------------------------------------------------------- */ + +async function runSelect( + db: NodePgDatabase | NodePgDatabase>, + table: AppKitTable, + opts: SelectOptions, +): Promise { + const drizzleTable = table.$drizzle as DrizzleTable; + const projection = projectColumns(table, opts.columns); + + let builder = db + .select(projection as never) + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque at this boundary. + .from(drizzleTable as any) as any; + + const where = buildWhere(table, opts.where); + if (where) builder = builder.where(where); + + const orderClauses = buildOrder(table, opts.order); + if (orderClauses.length > 0) builder = builder.orderBy(...orderClauses); + + if (opts.limit !== undefined) builder = builder.limit(opts.limit); + if (opts.offset !== undefined) builder = builder.offset(opts.offset); + + const rows = (await maybeAbort(builder, opts.signal)) as Row[]; + return rows; +} + +// Always project explicitly — bare `select()` would `SELECT *` and leak +// `.private()` columns. Reused by `.returning()` on writes. +function projectColumns( + table: AppKitTable, + columns: ReadonlyArray | undefined, +): Record { + const names = + columns && columns.length > 0 ? columns : nonPrivateColumnNames(table); + const out: Record = {}; + for (const name of names) { + out[name] = getColumn(table, name); + } + return out; +} + +/* -------------------------------------------------------------------------- * + * WHERE * + * -------------------------------------------------------------------------- */ + +/** + * @internal Exported for snapshot tests that lock in operator → SQL fragment + * mapping. Not part of the public surface — go through `DataPath` instead. + */ +export function buildWhere( + table: AppKitTable, + spec: WhereSpec | undefined, +): SQL | undefined { + if (!spec) return undefined; + + const conditions: SQL[] = []; + for (const [columnName, value] of Object.entries(spec)) { + const col = getColumn(table, columnName); + + if (Array.isArray(value)) { + conditions.push(inArray(col as never, value as never[])); + continue; + } + + if (value === null) { + // `= NULL` is never true in SQL — use IS NULL. + conditions.push(isNull(col as never)); + continue; + } + + if (typeof value !== "object") { + conditions.push(eq(col as never, value as never)); + continue; + } + + for (const op of Object.keys(value)) { + if (!Object.hasOwn(value, op)) continue; + const condition = buildOperator( + col, + op, + (value as Record)[op], + ); + if (condition) conditions.push(condition); + } + } + + if (conditions.length === 0) return undefined; + if (conditions.length === 1) return conditions[0]; + return and(...conditions); +} + +function buildOperator( + col: DrizzleColumn, + op: string, + value: unknown, +): SQL | undefined { + switch (op) { + case "eq": + return eq(col as never, value as never); + case "neq": + return ne(col as never, value as never); + case "gt": + return gt(col as never, value as never); + case "gte": + return gte(col as never, value as never); + case "lt": + return lt(col as never, value as never); + case "lte": + return lte(col as never, value as never); + case "like": + return like(col as never, String(value)); + case "ilike": + return ilike(col as never, String(value)); + case "in": + return inArray(col as never, value as never[]); + case "is": + return value === null + ? isNull(col as never) + : eq(col as never, value as never); + default: + throw new Error(`Unsupported where operator: ${op}`); + } +} + +/* -------------------------------------------------------------------------- * + * ORDER * + * -------------------------------------------------------------------------- */ + +/** @internal Exported for snapshot tests; same caveat as `buildWhere`. */ +export function buildOrder( + table: AppKitTable, + spec: OrderSpec | undefined, +): SQL[] { + if (!spec) return []; + const out: SQL[] = []; + for (const [columnName, direction] of Object.entries(spec)) { + const col = getColumn(table, columnName); + out.push(direction === "desc" ? desc(col as never) : asc(col as never)); + } + return out; +} + +/* -------------------------------------------------------------------------- * + * INCLUDE (joins via two-query pattern) * + * -------------------------------------------------------------------------- */ + +async function applyIncludes( + db: NodePgDatabase | NodePgDatabase>, + schema: Schema, + parentTable: AppKitTable, + parentRows: Row[], + spec: IncludeSpec, +): Promise { + // Two queries per relation (parent + related-rows-by-IN(ids)) avoids N+1 + // without Drizzle's relations API. Mutates parent rows in place. + for (const [relationName, options] of Object.entries(spec)) { + if (options === undefined) continue; + const opts: IncludeOptions = options === true ? {} : options; + const resolved = resolveRelation(schema, parentTable, relationName); + + if (resolved.kind === "manyToOne") { + await applyManyToOne(db, parentRows, relationName, resolved, opts); + } else { + await applyOneToMany(db, parentRows, relationName, resolved, opts); + } + } +} + +async function applyManyToOne( + db: NodePgDatabase | NodePgDatabase>, + parentRows: Row[], + relationName: string, + resolved: Extract, + opts: IncludeOptions, +): Promise { + const parentFkValues = uniqueDefined( + parentRows.map((r) => r[resolved.parentFk]), + ); + + if (parentFkValues.length === 0) { + for (const row of parentRows) row[relationName] = null; + return; + } + + const relatedTable = resolved.relatedTable; + const relatedDrizzle = relatedTable.$drizzle as DrizzleTable; + const relatedKey = getColumn(relatedTable, resolved.relatedKey); + const projection = projectColumns(relatedTable, opts.select); + + // Drizzle `.where()` replaces on re-call — combine here, or the IN(parent_ids) + // predicate gets dropped and the include leaks rows across parents. + const inClause = inArray(relatedKey as never, parentFkValues as never[]); + const userWhere = buildWhere(relatedTable, opts.where); + const combined = userWhere ? and(inClause, userWhere) : inClause; + + let builder = db + .select(projection as never) + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque. + .from(relatedDrizzle as any) + .where(combined as never) as any; + + const order = buildOrder(relatedTable, opts.order); + if (order.length > 0) builder = builder.orderBy(...order); + + const relatedRows = (await builder) as Row[]; + + // Index by PK for O(1) parent assignment. + const byKey = new Map(); + for (const row of relatedRows) { + byKey.set(row[resolved.relatedKey], row); + } + + for (const row of parentRows) { + const fk = row[resolved.parentFk]; + row[relationName] = fk == null ? null : (byKey.get(fk) ?? null); + } +} + +async function applyOneToMany( + db: NodePgDatabase | NodePgDatabase>, + parentRows: Row[], + relationName: string, + resolved: Extract, + opts: IncludeOptions, +): Promise { + const parentKeyValues = uniqueDefined( + parentRows.map((r) => r[resolved.parentKey]), + ); + + if (parentKeyValues.length === 0) { + for (const row of parentRows) row[relationName] = []; + return; + } + + const relatedTable = resolved.relatedTable; + const relatedDrizzle = relatedTable.$drizzle as DrizzleTable; + const relatedFk = getColumn(relatedTable, resolved.relatedFk); + const projection = projectColumns(relatedTable, opts.select); + + // Combine before `.where()` — see applyManyToOne. + const inClause = inArray(relatedFk as never, parentKeyValues as never[]); + const userWhere = buildWhere(relatedTable, opts.where); + const combined = userWhere ? and(inClause, userWhere) : inClause; + + let builder = db + .select(projection as never) + // biome-ignore lint/suspicious/noExplicitAny: Drizzle's table generic is opaque. + .from(relatedDrizzle as any) + .where(combined as never) as any; + + const order = buildOrder(relatedTable, opts.order); + if (order.length > 0) builder = builder.orderBy(...order); + + // `.limit(n)` would cap the whole query, not per-parent — apply per-parent + // after fetch instead. Fine for small N; large fan-out should paginate + // the parent query and call again. + const relatedRows = (await builder) as Row[]; + + const grouped = new Map(); + for (const row of relatedRows) { + const key = row[resolved.relatedFk]; + const list = grouped.get(key); + if (list) { + list.push(row); + } else { + grouped.set(key, [row]); + } + } + + for (const row of parentRows) { + const key = row[resolved.parentKey]; + const list = grouped.get(key) ?? []; + row[relationName] = opts.limit ? list.slice(0, opts.limit) : list; + } +} + +/** + * Resolve a relation by name. Lookup is lenient: by related table name + * (`{ posts: true }`), FK column (`{ author_id: true }`), or entity key. + */ +function resolveRelation( + schema: Schema, + parentTable: AppKitTable, + relationName: string, +): ResolvedRelation { + // Many-to-one: this table has a FK that matches the relation name. + for (const rel of parentTable.$relations) { + if ( + rel.fromColumn === relationName || + rel.toTable === relationName || + relationName === entityKeyOf(schema, rel.toTable) + ) { + const relatedTable = findTableByName(schema, rel.toTable); + if (!relatedTable) { + throw new Error( + `Relation "${relationName}" on "${parentTable.name}" references unknown table "${rel.toTable}"`, + ); + } + return { + kind: "manyToOne", + relatedTable, + parentFk: rel.fromColumn, + relatedKey: rel.toColumn, + }; + } + } + + // One-to-many: another table has a FK back to this table. + for (const [otherKey, otherTable] of Object.entries(schema.$tables)) { + if (otherTable.name === parentTable.name) continue; + for (const rel of otherTable.$relations) { + if (rel.toTable !== parentTable.name) continue; + if (relationName === otherKey || relationName === otherTable.name) { + return { + kind: "oneToMany", + relatedTable: otherTable, + relatedFk: rel.fromColumn, + parentKey: rel.toColumn, + }; + } + } + } + + throw new Error( + `Unknown relation "${relationName}" on table "${parentTable.name}". ` + + `Expected a foreign-key column on this table or another table referencing it.`, + ); +} + +function findTableByName( + schema: Schema, + tableName: string, +): AppKitTable | undefined { + for (const t of Object.values(schema.$tables)) { + if (t.name === tableName) return t; + } + return undefined; +} + +function entityKeyOf(schema: Schema, tableName: string): string | undefined { + for (const [key, t] of Object.entries(schema.$tables)) { + if (t.name === tableName) return key; + } + return undefined; +} + +/* -------------------------------------------------------------------------- * + * Helpers * + * -------------------------------------------------------------------------- */ + +// Gate on `$columns` so untrusted input can't reach prototype keys or Drizzle internals. +function getColumn(table: AppKitTable, columnName: string): DrizzleColumn { + if (typeof columnName !== "string" || columnName.length === 0) { + throw new Error( + `Invalid column reference on table "${table.name}": expected non-empty string`, + ); + } + if (!Object.hasOwn(table.$columns, columnName)) { + throw new Error(`Unknown column "${columnName}" on table "${table.name}"`); + } + const drizzleTable = table.$drizzle as DrizzleTable; + const col = drizzleTable[columnName]; + if (col === undefined || col === null) { + throw new Error( + `Column "${columnName}" missing from drizzle table "${table.name}" — schema/runtime out of sync`, + ); + } + return col; +} + +// Placeholder for AbortSignal support — node-postgres won't cancel in-flight +// queries. `statement_timeout` (set on every pool connect) bounds runaway +// reads; AppKit's timeout interceptor still rejects the awaited promise. +async function maybeAbort( + builder: Promise | { then: PromiseLike["then"] }, + _signal?: AbortSignal, +): Promise { + return await (builder as Promise); +} + +function uniqueDefined(values: ReadonlyArray): T[] { + const seen = new Set(); + const out: T[] = []; + for (const v of values) { + if (v == null) continue; + if (seen.has(v)) continue; + seen.add(v); + out.push(v); + } + return out; +} diff --git a/packages/appkit/src/database/runtime/index.ts b/packages/appkit/src/database/runtime/index.ts new file mode 100644 index 00000000..1587f53a --- /dev/null +++ b/packages/appkit/src/database/runtime/index.ts @@ -0,0 +1,12 @@ +export type { + CountOptions, + DataPath, + IncludeSpec, + OrderSpec, + SelectOptions, + WhereSpec, +} from "./data-path"; +export { + createDrizzleDataPath, + createUserScopedDataPath, +} from "./drizzle-runtime"; diff --git a/packages/appkit/src/database/runtime/tests/drizzle-runtime.test.ts b/packages/appkit/src/database/runtime/tests/drizzle-runtime.test.ts new file mode 100644 index 00000000..f44b7885 --- /dev/null +++ b/packages/appkit/src/database/runtime/tests/drizzle-runtime.test.ts @@ -0,0 +1,86 @@ +import { PgDialect } from "drizzle-orm/pg-core"; +import { describe, expect, test } from "vitest"; +import { + defineSchema, + id, + integer, + text, + timestamp, +} from "../../schema-builder"; +import { buildOrder, buildWhere } from "../drizzle-runtime"; + +const schema = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + role: text(), + age: integer(), + createdAt: timestamp(), + }), +})); + +const dialect = new PgDialect(); + +describe("drizzle-runtime buildWhere → SQL", () => { + test("scalar shorthand becomes col = $1", () => { + const sql = buildWhere(schema.user, { email: "alice@x" }); + expect(sql).toBeDefined(); + if (sql) { + const out = dialect.sqlToQuery(sql); + expect(out.sql).toMatch(/"email" = \$1/); + expect(out.params).toEqual(["alice@x"]); + } + }); + + test("array value becomes col IN ($1, $2)", () => { + const sql = buildWhere(schema.user, { role: ["admin", "owner"] }); + if (sql) { + const out = dialect.sqlToQuery(sql); + expect(out.sql).toMatch(/"role" in \(\$1, \$2\)/); + expect(out.params).toEqual(["admin", "owner"]); + } + }); + + test("operator object renders gte / lte / ilike / is null", () => { + const sql = buildWhere(schema.user, { + age: { gte: 18, lte: 65 }, + email: { ilike: "%@example.com" }, + role: { is: null }, + }); + expect(sql).toBeDefined(); + if (sql) { + const out = dialect.sqlToQuery(sql); + expect(out.sql).toContain('"age" >= '); + expect(out.sql).toContain('"age" <= '); + expect(out.sql).toContain('"email" ilike '); + expect(out.sql).toContain('"role" is null'); + expect(out.params).toEqual([18, 65, "%@example.com"]); + } + }); + + test("undefined spec returns undefined (no WHERE clause)", () => { + expect(buildWhere(schema.user, undefined)).toBeUndefined(); + }); + + test("empty spec returns undefined (no WHERE clause)", () => { + expect(buildWhere(schema.user, {})).toBeUndefined(); + }); +}); + +describe("drizzle-runtime buildOrder → SQL", () => { + test("returns empty list for missing or empty spec", () => { + expect(buildOrder(schema.user, undefined)).toEqual([]); + expect(buildOrder(schema.user, {})).toEqual([]); + }); + + test("preserves the declared order of clauses", () => { + const out = buildOrder(schema.user, { email: "asc", createdAt: "desc" }); + expect(out).toHaveLength(2); + const first = dialect.sqlToQuery(out[0]); + expect(first.sql).toContain('"email"'); + expect(first.sql.toLowerCase()).toContain("asc"); + const second = dialect.sqlToQuery(out[1]); + expect(second.sql).toContain('"createdAt"'); + expect(second.sql.toLowerCase()).toContain("desc"); + }); +}); diff --git a/packages/appkit/src/plugins/database/database.ts b/packages/appkit/src/plugins/database/database.ts index 1cc527d5..9fd87c99 100644 --- a/packages/appkit/src/plugins/database/database.ts +++ b/packages/appkit/src/plugins/database/database.ts @@ -1,7 +1,8 @@ import type { Pool } from "pg"; +import type { IAppRouter } from "shared"; import { Plugin, toPlugin } from "@/plugin"; import { createLakebasePool } from "../../connectors/lakebase"; -import type { Schema } from "../../database"; +import type { DataPath, Schema } from "../../database"; import { ConfigurationError } from "../../errors"; import { createLogger } from "../../logging/logger"; import type { PluginManifest } from "../../registry"; @@ -11,11 +12,33 @@ import { POOL_DEFAULTS, STATEMENT_TIMEOUT_DEFAULT_MS, } from "./defaults"; +import type { EntityClient, ExecutorFn } from "./entity-proxy"; +import { type UserPoolRegistry, wireEntities } from "./entity-wiring"; import manifest from "./manifest.json"; -import type { IDatabaseConfig } from "./types"; +import { RouteGenerator } from "./route-generator"; +import type { HttpAccess, IDatabaseConfig } from "./types"; const logger = createLogger("database"); +type TransactionFn = (tx: DataPath) => Promise; + +type DatabaseExports = { + [entity: string]: + | EntityClient + | (() => Pool) + | ((fn: TransactionFn) => Promise) + | (( + strings: TemplateStringsArray, + ...values: unknown[] + ) => Promise); + getPool: () => Pool; + transaction: (fn: TransactionFn) => Promise; + sql: ( + strings: TemplateStringsArray, + ...values: unknown[] + ) => Promise; +}; + class DatabasePlugin extends Plugin { static manifest = manifest as PluginManifest<"database">; @@ -23,6 +46,9 @@ class DatabasePlugin extends Plugin { protected pool: Pool | null = null; protected schema: Schema | null = null; protected schemaPath: string | null = null; + protected entities: Record = {}; + protected dataPath: DataPath | null = null; + protected userPools: UserPoolRegistry | null = null; constructor(config: IDatabaseConfig = {}) { super(config); @@ -30,21 +56,23 @@ class DatabasePlugin extends Plugin { } async setup() { + // Service-principal pool. Same factory the standalone `lakebase` plugin + // uses — Lakebase OAuth refresh is built in. Dev = current user OAuth, + // prod = SP OAuth, both transparent. this.pool = createLakebasePool({ ...POOL_DEFAULTS, ...this.config.connection, }); attachSessionDefaults(this.pool, this.config.statementTimeoutMs); - if (process.env.APPKIT_DEBUG_POOL || process.env.DEBUG_POOL) { + if (process.env.DEBUG_POOL) startPoolStatsLog(this.pool, "service-principal"); - } logger.info("Database plugin pool initialized"); try { const loaded = await loadSchemaByConvention(); if (!loaded) { logger.warn( - "Database plugin did not find config/database/schema.ts, using empty schema", + "Database plugin did not find config/database/schema.ts; running with no entities", ); return; } @@ -56,48 +84,144 @@ class DatabasePlugin extends Plugin { loaded.schemaPath, Object.keys(loaded.schema.$tables).length, ); + + // Wiring builds an EntityClient per table on top of the SP pool, plus a + // per-user pool registry used by `EntityClient.asUser(req)` for OBO. + const executor: ExecutorFn = async (fn, options) => { + const result = await this.execute(fn, options); + if (!result.ok) { + // Preserve the interceptor's status (already scrubbed for prod by + // Plugin#execute) so the route can echo the right HTTP code. + throw new DatabaseRouteError(result.status, result.message); + } + return result.data; + }; + + const wired = wireEntities({ + schema: this.schema, + config: this.config, + servicePool: this.pool, + executor, + }); + + this.entities = wired.entities; + this.dataPath = wired.dataPath; + this.userPools = wired.userPools; + logger.info( + "Database entity API wired for: %s", + Object.keys(this.entities).join(", "), + ); } catch (err) { + // A throwing schema-load otherwise cascades through Promise.all in core + // and crashes every plugin's boot. Decorate the error with the + // convention path so the operator can find it, then re-raise unless the + // caller opted into tolerant boot. const message = err instanceof Error ? err.message : String(err); logger.error( "Database schema load failed (config/database/schema.ts): %s", message, ); - if (!this.config.tolerateSetupFailure) { - const stalePool = this.pool; - this.pool = null; - if (stalePool) { - await stalePool.end().catch((endErr) => { - logger.error( - "Error draining stale pool after schema-load failure: %O", - endErr, - ); - }); - } - throw err; - } + if (!this.config.tolerateSetupFailure) throw err; } } + injectRoutes(router: IAppRouter): void { + if (!this.schema) return; + + new RouteGenerator({ + schema: this.schema, + config: this.config, + getSurface: (req, access) => this.getSurface(req, access), + getServicePool: () => this.requirePool(), + route: (target, config) => this.route(target, config), + }).injectAll(router); + } + + asUser(req: import("express").Request): this { + const baseProxy = super.asUser(req); + + // No registry means setup() ran without a schema. Nothing to scope. + if (!this.userPools) return baseProxy; + + const identity = this.userPools.resolveIdentity(req); + // Dev fallback: identity is null when OBO headers are absent in dev. + // resolveIdentity already logged the warning; throws in prod. + if (!identity) return baseProxy; + + const userPool = this.userPools.getOrCreate(identity); + const userDataPath = this.userPools.getOrCreateDataPath(identity); + + const userEntities: Record = {}; + for (const [name, client] of Object.entries(this.entities)) { + userEntities[name] = client.asUser(req); + } + + const userExports = (): DatabaseExports => ({ + ...userEntities, + getPool: () => userPool, + transaction: (fn: TransactionFn) => userDataPath.transaction(fn), + sql: (strings, ...values) => userDataPath.raw(strings, ...values), + }); + + return new Proxy(baseProxy, { + get: (target, prop, receiver) => { + if (prop === "exports") return userExports; + if (typeof prop === "string" && prop in userEntities) { + return userEntities[prop]; + } + return Reflect.get(target, prop, receiver); + }, + }) as this; + } + async abortActiveOperations(): Promise { super.abortActiveOperations(); - if (!this.pool) return; - logger.info("Closing database pool"); - const draining = this.pool.end(); - this.pool = null; - try { - await draining; - } catch (err) { - logger.error("Error closing database pool: %O", err); + // Drain the SP pool first, then any per-user pools built by asUser(). + const drains: Array> = []; + if (this.pool) { + logger.info("Closing database pool"); + const draining = this.pool.end().catch((err) => { + logger.error("Error closing database pool: %O", err); + }); + this.pool = null; + drains.push(draining); } + + if (this.userPools) { + const pools = this.userPools; + this.userPools = null; + drains.push( + pools.closeAll().catch((err) => { + logger.error("Error closing per-user database pools: %O", err); + }), + ); + } + + await Promise.all(drains); } - exports() { + exports(): DatabaseExports { return { + ...this.entities, getPool: () => this.requirePool(), + transaction: (fn: TransactionFn) => + this.requireDataPath().transaction(fn), + sql: (strings, ...values) => + this.requireDataPath().raw(strings, ...values), }; } + protected requireDataPath(): DataPath { + if (!this.dataPath) { + throw ConfigurationError.resourceNotFound( + "Database", + "Database runtime not initialized — declare config/database/schema.ts before calling transaction() or sql``.", + ); + } + return this.dataPath; + } + protected requirePool(): Pool { if (!this.pool) { throw ConfigurationError.resourceNotFound( @@ -107,64 +231,68 @@ class DatabasePlugin extends Plugin { } return this.pool; } + + private getSurface( + req: import("express").Request, + access: HttpAccess, + ): Record { + if (access === "obo") { + return this.asUser(req).exports() as Record; + } + return this.exports() as Record; + } } export const database = toPlugin(DatabasePlugin); /** - * Attach a `connect` listener that sets per-session defaults on - * every new Postgres session checked out of the pool - * @param pool - * @param override + * Carries the interceptor-derived HTTP status from the executor up to the + * route handler so 4xx classifications survive the throw. The route layer + * checks `instanceof DatabaseRouteError` to echo `statusCode`; everything + * else falls back to 500 with a scrubbed message in production. + */ +export class DatabaseRouteError extends Error { + readonly statusCode: number; + constructor(statusCode: number, message: string) { + super(message); + this.name = "DatabaseRouteError"; + this.statusCode = statusCode; + } +} + +/** + * Attach a `connect` listener that sets per-session defaults on every new + * Postgres session checked out of the pool: `statement_timeout` (caps runaway + * queries even when the client signal is dropped) and `application_name` (so + * the connection is attributable in `pg_stat_activity`). */ function attachSessionDefaults(pool: Pool, override?: number): void { const ms = override ?? STATEMENT_TIMEOUT_DEFAULT_MS; - const applicationName = applicationNameForSession(); pool.on("connect", (client) => { - let destroyed = false; - const destroy = (label: string, err: unknown) => { - if (destroyed) return; - destroyed = true; - logger.error( - "Failed to set %s on pool connection; destroying client to prevent unguarded use: %O", - label, - err, - ); - // `release(true)` removes the client from the pool entirely. pg will - // build a fresh connection on next acquire and re-fire `connect`. - const maybeRelease = ( - client as unknown as { release?: (destroy?: boolean) => void } - ).release; - try { - maybeRelease?.call(client, true); - } catch (releaseErr) { - logger.error("Failed to destroy pool client: %O", releaseErr); - } - }; client - .query(`SET application_name = '${applicationName}'`) - .catch((err) => destroy("application_name", err)); + .query(`SET application_name = '${APPLICATION_NAME}'`) + .catch((err) => { + logger.error( + "Failed to set application_name on pool connection: %O", + err, + ); + }); if (Number.isFinite(ms) && ms > 0) { - client - .query(`SET statement_timeout = ${Math.floor(ms)}`) - .catch((err) => destroy("statement_timeout", err)); + client.query(`SET statement_timeout = ${Math.floor(ms)}`).catch((err) => { + logger.error( + "Failed to set statement_timeout on pool connection: %O", + err, + ); + }); } }); } /** - * Build a per-session `application_name` string. + * When `DEBUG_POOL=1` is set, periodically log the pool's + * total/idle/waiting connection counts so operators can observe saturation. + * The interval is unrefed so it never blocks shutdown. */ -function applicationNameForSession(): string { - const appName = process.env.DATABRICKS_APP_NAME; - // Sanitize: only allow common identifier characters in the discriminator. - const safeAppName = appName?.replace(/[^A-Za-z0-9._-]/g, "_") ?? ""; - const composed = safeAppName - ? `${APPLICATION_NAME}:${safeAppName}` - : APPLICATION_NAME; - return composed.slice(0, 60); -} - function startPoolStatsLog(pool: Pool, label: string): void { const intervalMs = 30_000; const handle = setInterval(() => { diff --git a/packages/appkit/src/plugins/database/defaults.ts b/packages/appkit/src/plugins/database/defaults.ts index ab45c90c..2d9252d9 100644 --- a/packages/appkit/src/plugins/database/defaults.ts +++ b/packages/appkit/src/plugins/database/defaults.ts @@ -1,3 +1,5 @@ +import type { PluginExecuteConfig } from "shared"; + /** * Connection pool defaults for the service-principal pool. * 10 connections in the pool at maximum @@ -24,3 +26,27 @@ export const STATEMENT_TIMEOUT_DEFAULT_MS = 15_000; * connections back to AppKit. */ export const APPLICATION_NAME = "appkit:database"; + +/** + * Per-user (OBO) pool defaults. The plugin builds one pool per OBO user, so + * each pool stays small. Fan-out is `(1 + oboPoolMax) × max`; with the + * defaults that caps at `(1 + 25) × 4 + 10 = 114` connections per instance. + */ +export const OBO_POOL_DEFAULTS = { + ...POOL_DEFAULTS, + max: 4, +}; + +export const readDefaults: PluginExecuteConfig = { + timeout: 30_000, + retry: { enabled: false }, + cache: { enabled: false }, + telemetryInterceptor: { spanName: "database.read" }, +}; + +export const writeDefaults: PluginExecuteConfig = { + timeout: 30_000, + retry: { enabled: false }, + cache: { enabled: false }, + telemetryInterceptor: { spanName: "database.write" }, +}; diff --git a/packages/appkit/src/plugins/database/entity-proxy.ts b/packages/appkit/src/plugins/database/entity-proxy.ts new file mode 100644 index 00000000..bcbb875e --- /dev/null +++ b/packages/appkit/src/plugins/database/entity-proxy.ts @@ -0,0 +1,565 @@ +import type { PluginExecuteConfig, PluginExecutionSettings } from "shared"; +import type { + AppKitTable, + DataPath, + IncludeSpec, + OrderSpec, + WhereSpec, +} from "@/database"; +import { createLogger } from "@/logging/logger"; +import { readDefaults, writeDefaults } from "./defaults"; +import type { CacheSettings, EntityHooks, HookContext } from "./types"; + +// RFC 5321 §4.5.3.1.3 caps email at 320 octets. +const MAX_EMAIL_LEN = 320; + +/** Trim, lowercase, length-cap. Returns `null` for missing/empty/oversize. */ +export function normalizeOboEmail(raw: string | undefined): string | null { + if (!raw) return null; + const trimmed = raw.trim().toLowerCase(); + if (trimmed.length === 0 || trimmed.length > MAX_EMAIL_LEN) return null; + return trimmed; +} + +const logger = createLogger("database:entity"); +type Row = Record; +const MAX_LIMIT = 500; + +// Default read projection — `.private()` columns never leak via +// `appkit.database.` or generated routes unless `.select()`-ed in. +function publicColumnNames(table: AppKitTable): string[] { + return Object.entries(table.$columns) + .filter(([, meta]) => meta.private !== true) + .map(([name]) => name); +} + +type DatabaseAction = + | "list" + | "find" + | "count" + | "create" + | "update" + | "upsert" + | "delete"; + +/** + * Bound `Plugin#execute` wrapper from `entity-wiring.ts`. Wiring unwraps the + * `ExecutionResult` union and rethrows on failure so entity terminators + * see a flat `Promise` contract. + */ +export type ExecutorFn = ( + fn: (signal?: AbortSignal) => Promise, + options: PluginExecutionSettings, +) => Promise; + +/** + * Predicate accepted by `where`. A bare value is shorthand for equality; an + * array is shorthand for `IN`; an object selects one or more operators. + */ +type WhereOperator = { + eq?: T; + neq?: T; + gt?: T; + gte?: T; + lt?: T; + lte?: T; + like?: string; + ilike?: string; + in?: T[]; + is?: T | null; +}; + +export type WhereInput = { + [K in keyof TRow]?: TRow[K] | WhereOperator; +}; + +export type OrderInput = { + [K in keyof TRow]?: "asc" | "desc"; +}; + +type RelatedRow< + TIncludes, + K extends keyof TIncludes, +> = TIncludes[K] extends Array<{ + row: infer R; +}> + ? Extract + : TIncludes[K] extends { row: infer R } + ? Extract + : Row; + +export type IncludeInput = { + [K in keyof TIncludes]?: + | true + | { + select?: ReadonlyArray>; + limit?: number; + order?: OrderInput>; + where?: WhereInput>; + }; +}; + +/** + * Project the runtime shape of `.include({ K: ... })`. One-to-many includes + * declare `Array<{ row: R }>` and resolve to `R[]`; one-to-one includes + * declare `{ row: R }` and resolve to `R | null`. + */ +export type ApplyIncludes = { + [K in keyof I & keyof TIncludes]: TIncludes[K] extends Array<{ row: infer R }> + ? Extract[] + : TIncludes[K] extends { row: infer R } + ? Extract | null + : never; +}; + +/** + * Public server-side entity facade — `appkit.database.`. + * + * Chain methods are immutable; terminators run via the bound executor. + * Filter/order operators map 1:1 to `DataPath` — SQL translation is the + * runtime's job, not AppKit's. + */ +export interface EntityClient< + TRow extends Row = Row, + TInsert = TRow, + TUpdate = Partial, + TIncludes = Record, +> { + where( + predicate: WhereInput, + ): EntityClient; + order( + input: OrderInput, + ): EntityClient; + limit(n: number): EntityClient; + offset(n: number): EntityClient; + + /** + * Opt out of `MAX_LIMIT` for `toArray()`. Background jobs only — request + * handlers should page or `limit()`. `statement_timeout` still bounds runaway. + */ + unbounded(): EntityClient; + + select( + ...cols: K[] + ): EntityClient & Row, TInsert, TUpdate, TIncludes>; + + include>( + input: I, + ): EntityClient< + TRow & ApplyIncludes, + TInsert, + TUpdate, + TIncludes + >; + + toArray(): Promise; + first(): Promise; + find(id: string | number): Promise; + count(): Promise; + create(data: TInsert): Promise; + update(id: string | number, patch: TUpdate): Promise; + upsert(data: TInsert, options: { onConflict: keyof TRow }): Promise; + delete(id: string | number): Promise; + + /** + * Per-request OBO clone — resolves identity from `x-forwarded-email` and + * swaps in a per-user DataPath. Without the header in dev, returns the SP + * client so the dev loop stays unbroken. + */ + asUser( + req: import("express").Request, + ): EntityClient; +} + +interface EntityClientDeps { + /** Table metadata and Zod validators produced by `defineSchema`. */ + table: AppKitTable; + /** Logical entity key used for cache keys, hook context, and telemetry. */ + entity: string; + /** Service-principal data path. Default for non-OBO calls. */ + dataPath: DataPath; + /** Single-column primary key. Defaults to `"id"` when schema metadata absent. */ + pkColumn?: string; + hooks?: EntityHooks; + /** Late-bound so `asUser(req)` can attach req/user info to hooks. */ + hookContext: () => HookContext; + /** Bound `Plugin#execute` wrapper. Every terminator must go through this. */ + execute: ExecutorFn; + /** Build per-user DataPath. Returns `null` only in dev when OBO headers are absent. */ + makeUserDataPath: (req: import("express").Request) => DataPath | null; + cache?: CacheSettings; +} + +/** + * Internal chain state. Pagination is tracked separately so `offset/limit` + * can be resolved against `MAX_LIMIT` at terminator time. + */ +interface EntityClientState { + where?: WhereSpec; + order?: OrderSpec; + limit?: number; + offset?: number; + columns?: string[]; + include?: IncludeSpec; + /** Opt-out flag set by `.unbounded()` — bypasses the default `MAX_LIMIT` cap. */ + unbounded?: boolean; + cacheKey: (string | number | object)[]; +} + +export function makeEntityClient< + TRow extends Row = Row, + TInsert = TRow, + TUpdate = Partial, + TIncludes = Record, +>(deps: EntityClientDeps): EntityClient { + return new EntityClientImpl(deps, { + cacheKey: [deps.entity], + }) as unknown as EntityClient; +} + +/** + * Thin immutable wrapper around `DataPath`. Terminators go through + * `this.run(action, fn)` → `Plugin#execute`, so telemetry, retry, cache, + * and timeout flow consistently per action. + */ +class EntityClientImpl< + TRow extends Row = Row, + TInsert = TRow, + TUpdate = Partial, + TIncludes = Record, +> { + constructor( + private readonly deps: EntityClientDeps, + private readonly state: EntityClientState, + ) {} + + where(predicate: WhereInput) { + // Successive .where() calls shallow-merge into AND. Matches Supabase/Drizzle + // intuition; no clobbering on distinct columns. + return this.chain( + { + where: { ...this.state.where, ...(predicate as WhereSpec) }, + }, + { where: predicate }, + ); + } + + order(input: OrderInput) { + return this.chain( + { order: { ...this.state.order, ...(input as OrderSpec) } }, + { order: input }, + ); + } + + limit(n: number) { + const requested = Math.max(0, Math.floor(n)); + if (this.state.unbounded) { + return this.chain({ limit: requested }, { limit: requested }); + } + const limit = Math.min(MAX_LIMIT, requested); + if (requested > MAX_LIMIT) { + logger.warn( + "limit(%d) on %s clamped to MAX_LIMIT=%d. Use .unbounded() for full scans.", + requested, + this.deps.entity, + MAX_LIMIT, + ); + } + return this.chain({ limit }, { limit }); + } + + offset(n: number) { + const offset = Math.max(0, Math.floor(n)); + return this.chain({ offset }, { offset }); + } + + unbounded() { + return this.chain({ unbounded: true }, { unbounded: true }); + } + + select(...cols: K[]) { + const allowed = new Set(publicColumnNames(this.deps.table)); + const requested = cols.map(String); + const dropped = requested.filter((c) => !allowed.has(c)); + if (dropped.length > 0) { + logger.debug( + "Dropped private/unknown column(s) from select on %s: %s", + this.deps.entity, + dropped.join(","), + ); + } + const filtered = requested.filter((c) => allowed.has(c)); + return this.chain({ columns: filtered }, { select: filtered.join(",") }); + } + + include>(input: I) { + return this.chain( + { include: { ...this.state.include, ...(input as IncludeSpec) } }, + { include: input }, + ) as never; + } + + asUser(req: import("express").Request) { + // Dev fallback: no OBO header → return self so routes don't 401. In prod + // `makeUserDataPath` throws instead of silently falling back. + const userDataPath = this.deps.makeUserDataPath(req); + if (!userDataPath) return this; + + const email = normalizeOboEmail(req.header("x-forwarded-email")); + const userDeps: EntityClientDeps = { + ...this.deps, + dataPath: userDataPath, + hookContext: () => ({ + ...this.deps.hookContext(), + req, + userId: email ?? undefined, + }), + }; + + // Identity is in the cache key so SP and OBO don't share a slot. Dev + // fallback uses req.id so unrelated requests don't collide on a shared + // `"unknown"` slot. + const reqId = (req as { id?: unknown }).id; + const identityKey = + email ?? (typeof reqId === "string" ? `unknown:${reqId}` : "unknown"); + return new EntityClientImpl(userDeps, { + ...this.state, + cacheKey: [ + this.deps.entity, + "asUser", + identityKey, + ...this.state.cacheKey.slice(1), + ], + }); + } + + /* ----------------------------------------------------------------- * + * Read terminators * + * ----------------------------------------------------------------- */ + + toArray(): Promise { + return this.run("list", async (signal) => { + const rows = await this.deps.dataPath.select(this.deps.table, { + where: this.state.where, + order: this.state.order, + ...this.resolvePagination(), + columns: this.state.columns ?? publicColumnNames(this.deps.table), + include: this.state.include, + signal, + }); + return rows as TRow[]; + }); + } + + first(): Promise { + return this.run("find", async (signal) => { + const rows = await this.deps.dataPath.select(this.deps.table, { + where: this.state.where, + order: this.state.order, + limit: 1, + offset: this.state.offset, + columns: this.state.columns ?? publicColumnNames(this.deps.table), + include: this.state.include, + signal, + }); + return ((rows[0] as TRow) ?? null) as TRow | null; + }); + } + + find(id: string | number): Promise { + return this.run("find", async (signal) => { + const row = await this.deps.dataPath.findOne( + this.deps.table, + this.pk(), + id, + { + columns: this.state.columns ?? publicColumnNames(this.deps.table), + include: this.state.include, + signal, + }, + ); + return (row as TRow) ?? null; + }); + } + + count(): Promise { + return this.run("count", async (signal) => { + return await this.deps.dataPath.count(this.deps.table, { + where: this.state.where, + signal, + }); + }); + } + + /* ----------------------------------------------------------------- * + * Write terminators * + * ----------------------------------------------------------------- */ + + create(data: TInsert): Promise { + return this.run("create", async (signal) => { + const ctx = this.deps.hookContext(); + const before = await this.deps.hooks?.beforeCreate?.( + data as Record, + ctx, + ); + const payload = before ?? data; + const validated = this.deps.table.$insertSchema.parse(payload); + const row = await this.deps.dataPath.insert( + this.deps.table, + validated as Row, + signal, + ); + await this.deps.hooks?.afterCreate?.(row, ctx); + return row as TRow; + }); + } + + update(id: string | number, patch: TUpdate): Promise { + return this.run("update", async (signal) => { + const ctx = this.deps.hookContext(); + const before = await this.deps.hooks?.beforeUpdate?.( + id, + patch as Record, + ctx, + ); + const payload = before ?? patch; + const validated = this.deps.table.$updateSchema.parse(payload); + const row = await this.deps.dataPath.update( + this.deps.table, + this.pk(), + id, + validated as Row, + signal, + ); + if (!row) { + // id may be user-supplied — strip control chars + length-cap before logging. + const safeId = String(id) + // biome-ignore lint/suspicious/noControlCharactersInRegex: deliberate + .replace(/[\x00-\x1f\x7f]/g, "?") + .slice(0, 64); + throw new Error( + `update: ${this.deps.table.name} not found (id=${safeId})`, + ); + } + await this.deps.hooks?.afterUpdate?.(row, ctx); + return row as TRow; + }); + } + + upsert(data: TInsert, options: { onConflict: keyof TRow }): Promise { + return this.run("upsert", async (signal) => { + const ctx = this.deps.hookContext(); + const before = await this.deps.hooks?.beforeUpsert?.( + data as Record, + ctx, + ); + const payload = before ?? data; + const validated = this.deps.table.$insertSchema.parse(payload); + const row = await this.deps.dataPath.upsert( + this.deps.table, + validated as Row, + { onConflict: String(options.onConflict) }, + signal, + ); + await this.deps.hooks?.afterUpsert?.(row, ctx); + return row as TRow; + }); + } + + delete(id: string | number): Promise { + return this.run("delete", async (signal) => { + const ctx = this.deps.hookContext(); + await this.deps.hooks?.beforeDelete?.(id, ctx); + await this.deps.dataPath.delete(this.deps.table, this.pk(), id, signal); + await this.deps.hooks?.afterDelete?.(id, ctx); + }); + } + + /* ----------------------------------------------------------------- * + * Internals * + * ----------------------------------------------------------------- */ + + private chain( + patch: Partial>, + cachePart?: string | number | object, + ) { + return new EntityClientImpl(this.deps, { + ...this.state, + ...patch, + cacheKey: + cachePart === undefined + ? this.state.cacheKey + : [...this.state.cacheKey, cachePart], + }); + } + + private pk(): string { + return this.deps.pkColumn ?? "id"; + } + + /** + * Default cap is `MAX_LIMIT` so reads stay bounded; opt out via `.unbounded()`. + * Throws when offset is set without limit. + */ + private resolvePagination(): { limit?: number; offset?: number } { + if (this.state.offset !== undefined && this.state.limit === undefined) { + throw new Error("offset() requires limit()"); + } + const limit = + this.state.limit ?? (this.state.unbounded ? undefined : MAX_LIMIT); + return { + limit, + offset: this.state.offset, + }; + } + + private run( + action: DatabaseAction, + fn: (signal?: AbortSignal) => Promise, + ) { + const isRead = action === "list" || action === "find" || action === "count"; + const baseDefaults: PluginExecuteConfig = isRead + ? readDefaults + : writeDefaults; + + const ttl = + action === "list" + ? this.deps.cache?.list?.ttl + : action === "find" + ? this.deps.cache?.find?.ttl + : action === "count" + ? this.deps.cache?.count?.ttl + : undefined; + + const cache = + ttl && ttl > 0 + ? { + ...baseDefaults.cache, + enabled: true, + ttl, + cacheKey: [ + "database", + this.deps.entity, + action, + ...this.state.cacheKey, + ], + } + : baseDefaults.cache; + + return this.deps.execute(fn, { + default: { + ...baseDefaults, + cache, + telemetryInterceptor: { + spanName: `database.${this.deps.entity}.${action}`, + attributes: { + "database.entity": this.deps.entity, + "database.action": action, + "database.channel": "drizzle-pool", + }, + }, + }, + }); + } +} diff --git a/packages/appkit/src/plugins/database/entity-wiring.ts b/packages/appkit/src/plugins/database/entity-wiring.ts new file mode 100644 index 00000000..992294e5 --- /dev/null +++ b/packages/appkit/src/plugins/database/entity-wiring.ts @@ -0,0 +1,294 @@ +import { createHash } from "node:crypto"; +import { WorkspaceClient } from "@databricks/sdk-experimental"; +import type { Pool } from "pg"; +import { createLakebasePool } from "@/connectors"; +import { ServiceContext } from "@/context"; +import { + type AppKitTable, + createDrizzleDataPath, + type DataPath, + type Schema, +} from "@/database"; +import { AuthenticationError, ConfigurationError } from "@/errors"; +import { createLogger } from "@/logging/logger"; +import { OBO_POOL_DEFAULTS, STATEMENT_TIMEOUT_DEFAULT_MS } from "./defaults"; +import { + type EntityClient, + type ExecutorFn, + makeEntityClient, + normalizeOboEmail, +} from "./entity-proxy"; +import type { IDatabaseConfig } from "./types"; + +/** + * Hashed log tag — keeps per-user lines correlatable without logging the raw + * email (PII). Format: `#`. + */ +function logTagForEmail(email: string): string { + const lower = email.toLowerCase(); + const visible = lower.slice(0, Math.min(lower.length, 3)); + const digest = createHash("sha256").update(lower).digest("hex").slice(0, 8); + return `${visible}#${digest}`; +} + +const logger = createLogger("database:wiring"); + +interface WireEntitiesArgs { + schema: Schema; + config: IDatabaseConfig; + /** Service-principal pool, already created by the plugin in `setup()`. */ + servicePool: Pool; + /** Bound `Plugin#execute` wrapper threaded into every entity. */ + executor: ExecutorFn; +} + +interface WireEntitiesResult { + entities: Record; + /** Service-principal DataPath. Backs `appkit.database.transaction` and `sql\`\``. */ + dataPath: DataPath; + /** Per-user pool registry. The plugin owns its lifecycle on shutdown. */ + userPools: UserPoolRegistry; +} + +/** + * Per-user pool registry. `getOrCreate(identity)` returns a `pg.Pool` whose + * Lakebase OAuth resolves to the given user. Bounded LRU keyed by email; + * evicted pools drain in the background so `closeAll()` awaits in-flight queries. + */ +export interface UserPoolRegistry { + resolveIdentity(req: import("express").Request): UserPoolIdentity | null; + getOrCreate(identity: UserPoolIdentity): Pool; + getOrCreateDataPath(identity: UserPoolIdentity): DataPath; + closeAll(): Promise; +} + +interface UserPoolIdentity { + email: string; + token: string; +} + +/** + * Wire one EntityClient per table on the SP pool, plus a per-user pool factory + * for `EntityClient.asUser(req)`. Wiring never touches Drizzle — only + * `DataPath` instances flow through. + */ +export function wireEntities(args: WireEntitiesArgs): WireEntitiesResult { + const serviceDataPath = createDrizzleDataPath(args.servicePool, args.schema); + const userPools = makeUserPoolRegistry(args.config, args.schema); + + const makeUserDataPath = ( + req: import("express").Request, + ): DataPath | null => { + const identity = userPools.resolveIdentity(req); + if (!identity) return null; + return userPools.getOrCreateDataPath(identity); + }; + + const entities: Record = {}; + for (const [entity, table] of Object.entries(args.schema.$tables)) { + entities[entity] = makeEntityClient({ + entity, + table, + dataPath: serviceDataPath, + pkColumn: derivePkColumn(table), + hooks: args.config.hooks?.[entity], + hookContext: () => ({ entity }), + execute: args.executor, + makeUserDataPath, + cache: args.config.cache, + }); + } + + // Plugin's `setup()` already logs the boot summary; don't duplicate. + return { entities, dataPath: serviceDataPath, userPools }; +} + +/** + * Resolve PK column name; defaults to `"id"`. Composite PKs are rejected at + * schema-builder time, so a single name always suffices. + */ +function derivePkColumn(table: AppKitTable): string { + for (const [name, meta] of Object.entries(table.$columns)) { + if (meta.primaryKey) return name; + } + return "id"; +} + +/** + * Per-user pool registry. Lazy create on first `getOrCreate(email)`, reused + * for the same identity. Built via `createLakebasePool` with a workspace + * client bound to the forwarded user token; OAuth refresh happens inside + * the pool. This layer only selects identity and bounds open-pool count. + */ +function makeUserPoolRegistry( + config: IDatabaseConfig, + schema: Schema, +): UserPoolRegistry { + const pools = new Map(); + // Evicted pools held here so `closeAll()` awaits graceful drain. + const draining = new Set(); + const dataPaths = new WeakMap(); + const maxPools = normalizePoolMax(config.oboPoolMax); + + function buildDataPath(pool: Pool): DataPath { + let dp = dataPaths.get(pool); + if (!dp) { + dp = createDrizzleDataPath(pool, schema); + dataPaths.set(pool, dp); + } + return dp; + } + + function getOrCreate(identity: UserPoolIdentity): Pool { + const key = identity.email.toLowerCase(); + const tag = logTagForEmail(identity.email); + const existing = pools.get(key); + if (existing) { + pools.delete(key); + pools.set(key, existing); + return existing; + } + + // Small per-user pool (default max=4); `config.connection` overrides win. + const pool = createLakebasePool({ + ...OBO_POOL_DEFAULTS, + ...config.connection, + user: identity.email, + workspaceClient: createUserWorkspaceClient(identity.token), + }); + // Session-local `app.user_id` so `current_user_id()` RLS helpers resolve + // to the OBO user — safe at session scope since identity is invariant in + // this per-user pool. `statement_timeout` set here too so OBO queries get + // the same server-side cap as SP ones. + const statementTimeoutMs = + config.statementTimeoutMs ?? STATEMENT_TIMEOUT_DEFAULT_MS; + pool.on("connect", (client) => { + client + .query("SELECT set_config('app.user_id', $1, false)", [identity.email]) + .catch((err) => { + logger.error( + "Failed to set app.user_id on user pool connection for %s: %O", + tag, + err, + ); + }); + if (Number.isFinite(statementTimeoutMs) && statementTimeoutMs > 0) { + client + .query(`SET statement_timeout = ${Math.floor(statementTimeoutMs)}`) + .catch((err) => { + logger.error( + "Failed to set statement_timeout on user pool connection for %s: %O", + tag, + err, + ); + }); + } + }); + pools.set(key, pool); + evictOldestIfNeeded(pools, draining, maxPools); + logger.debug("Created per-user pool for %s", tag); + return pool; + } + + return { + resolveIdentity(req) { + return resolveUserPoolIdentity(req); + }, + getOrCreate, + getOrCreateDataPath(identity) { + return buildDataPath(getOrCreate(identity)); + }, + async closeAll() { + const entries = Array.from(pools.entries()); + const drainingPools = Array.from(draining); + pools.clear(); + draining.clear(); + await Promise.all([ + ...entries.map(async ([email, pool]) => { + const tag = logTagForEmail(email); + try { + await pool.end(); + logger.debug("Closed per-user pool for %s", tag); + } catch (err) { + logger.error("Error closing per-user pool for %s: %O", tag, err); + } + }), + ...drainingPools.map(async (pool) => { + try { + await pool.end(); + } catch (err) { + logger.error("Error draining evicted per-user pool: %O", err); + } + }), + ]); + }, + }; +} + +function resolveUserPoolIdentity( + req: import("express").Request, +): UserPoolIdentity | null { + const isDev = process.env.NODE_ENV === "development"; + const email = normalizeOboEmail(req.header("x-forwarded-email")); + const token = req.header("x-forwarded-access-token"); + + if (email && token) return { email, token }; + + if (isDev) { + logger.warn( + "Database OBO requested without x-forwarded-email/x-forwarded-access-token; falling back to service pool in development.", + ); + return null; + } + + if (!token) throw AuthenticationError.missingToken("user token"); + throw new AuthenticationError( + "Missing x-forwarded-email header. Cannot create a user-scoped database pool.", + ); +} + +function createUserWorkspaceClient(token: string): WorkspaceClient { + const host = process.env.DATABRICKS_HOST; + if (!host) throw ConfigurationError.missingEnvVar("DATABRICKS_HOST"); + return new WorkspaceClient( + { + host, + token, + authType: "pat", + }, + ServiceContext.getClientOptions(), + ); +} + +function normalizePoolMax(value: number | undefined): number { + // Default 25 keeps fan-out tractable on Lakebase tiers ((1+25)×4 + SP(10) + // ≈ 114 conns). Hot-OBO apps should raise explicitly after sizing the tier. + if (!Number.isFinite(value) || value === undefined) return 25; + return Math.max(1, Math.floor(value)); +} + +function evictOldestIfNeeded( + pools: Map, + draining: Set, + maxPools: number, +): void { + while (pools.size > maxPools) { + const oldest = pools.entries().next().value as + | [email: string, pool: Pool] + | undefined; + if (!oldest) return; + const [email, pool] = oldest; + const tag = logTagForEmail(email); + pools.delete(email); + draining.add(pool); + pool + .end() + .catch((err) => { + logger.error("Error evicting per-user pool for %s: %O", tag, err); + }) + .finally(() => { + draining.delete(pool); + }); + logger.debug("Evicted per-user pool for %s", tag); + } +} diff --git a/packages/appkit/src/plugins/database/manifest.json b/packages/appkit/src/plugins/database/manifest.json index fb8c9170..c93ac80f 100644 --- a/packages/appkit/src/plugins/database/manifest.json +++ b/packages/appkit/src/plugins/database/manifest.json @@ -2,7 +2,7 @@ "$schema": "https://databricks.github.io/appkit/schemas/plugin-manifest.schema.json", "name": "database", "displayName": "Database", - "description": "Lakebase Postgres pool + schema declaration via defineSchema. CRUD/OBO/RLS surface ships incrementally in subsequent stack layers; this layer provides the pool, schema convention loader, and column metadata.", + "description": "Application database with schema-driven CRUD, type generation, OBO, and RLS", "hidden": false, "stability": "beta", "resources": { @@ -61,25 +61,14 @@ "type": "object", "additionalProperties": false, "properties": { - "connection": { - "type": "object", - "additionalProperties": true, - "description": "Optional pg.Pool overrides forwarded to createLakebasePool. Avoid setting `password`/`user` here — Lakebase uses OAuth." - }, - "statementTimeoutMs": { - "type": "number", - "description": "Server-side `statement_timeout` (ms) applied per pool connection. Defaults to 15_000." - }, - "tolerateSetupFailure": { - "type": "boolean", - "description": "If true, plugin boot continues with an empty schema when config/database/schema.ts fails to load. Off by default." - }, + "http": { "type": "object", "additionalProperties": true }, + "hooks": { "type": "object", "additionalProperties": true }, + "checkDrift": { "type": "boolean", "default": true }, "oboPoolMax": { "type": "number", - "description": "Max number of distinct OBO pools held in the LRU. Worst-case fan-out is oboPoolMax × OBO_POOL_DEFAULTS.max + POOL_DEFAULTS.max connections per app instance." + "default": 50, + "description": "Maximum number of per-user OBO pools to keep open." }, - "http": { "type": "object", "additionalProperties": true }, - "hooks": { "type": "object", "additionalProperties": true }, "cache": { "type": "object", "properties": { @@ -100,5 +89,5 @@ } } }, - "onSetupMessage": "Database plugin installed. Configure your schema in config/database/schema.ts via defineSchema(). The plugin currently exposes pool access (appkit.database.getPool()); CRUD, OBO, and RLS surfaces ship in subsequent stack layers." + "onSetupMessage": "Database plugin installed. Next: npx appkit db init\n - For a new database: define entities in config/database/schema.ts (and optional demo rows in config/database/seed.sql) before running init.\n - For an existing Lakebase: just run init; it will introspect the schema for you.\nThe init command picks the Databricks profile and Lakebase project, creates a per-user dev branch, writes .env, and runs the right setup or introspection." } diff --git a/packages/appkit/src/plugins/database/route-generator.ts b/packages/appkit/src/plugins/database/route-generator.ts new file mode 100644 index 00000000..ba54bd38 --- /dev/null +++ b/packages/appkit/src/plugins/database/route-generator.ts @@ -0,0 +1,717 @@ +import type express from "express"; +import type { IAppRouter, RouteConfig } from "shared"; +import { ZodError } from "zod"; +import type { AppKitTable, Schema } from "@/database"; +import { AppKitError } from "@/errors"; +import { createLogger } from "@/logging/logger"; +import { DatabaseRouteError } from "./database"; +import type { EntityClient, WhereInput } from "./entity-proxy"; +import type { HttpAccess, HttpEntityOverride, IDatabaseConfig } from "./types"; + +type ColumnKind = + | "text" + | "number" + | "boolean" + | "date" + | "json" + | "uuid" + | "unknown"; + +/** + * Read the Drizzle `columnType` off a table's `$drizzle` value to classify a + * column as text/number/boolean/etc. Used to keep `coerceFilterValue` and + * `coerceId` from over-eagerly coercing strings on text/uuid columns. + * + * Hand-rolled (rather than importing Drizzle's types) so this file stays out + * of the drizzle-orm import graph — `$drizzle` is `unknown` at the AppKit + * boundary, but the runtime layer already reads the same property names off + * it (see `database/runtime/drizzle-runtime.ts:getColumn`). + */ +function inferColumnKind(table: AppKitTable, name: string): ColumnKind { + const drizzleTable = table.$drizzle as + | Record + | undefined; + const ct = drizzleTable?.[name]?.columnType ?? ""; + if (ct === "PgText" || ct === "PgVarchar") return "text"; + if ( + ct === "PgInteger" || + ct === "PgSerial" || + ct === "PgBigInt" || + ct === "PgBigInt53" + ) { + return "number"; + } + if (ct === "PgBoolean") return "boolean"; + if (ct === "PgTimestamp") return "date"; + if (ct === "PgJsonb" || ct === "PgJson") return "json"; + if (ct === "PgUuid") return "uuid"; + return "unknown"; +} + +function buildColumnKindMap( + table: AppKitTable, + cols: ReadonlySet, +): Map { + const out = new Map(); + for (const name of cols) out.set(name, inferColumnKind(table, name)); + return out; +} + +const logger = createLogger("database:routes"); + +type Verb = "list" | "find" | "count" | "create" | "update" | "delete"; +type DatabaseExecutionSurface = Record; + +// Zero-trust default: every generated route runs OBO unless the app author +// explicitly opts that verb into service/public mode or disables it. +const DEFAULT_ACCESS: Record = { + list: "obo", + find: "obo", + count: "obo", + create: "obo", + update: "obo", + delete: "obo", +}; + +const DEFAULT_LIMIT = 50; +const MAX_LIMIT = 500; + +// These query params control the shape of the read. Everything else is treated +// as a potential column filter, but only if it matches a declared schema column. +const RESERVED_QUERY_KEYS = new Set([ + "select", + "order", + "limit", + "offset", + "include", + "on_conflict", +]); + +// Keep the HTTP dialect intentionally identical to PostGREST's builder methods: +// `?age=gte.18`, `?name=ilike.%foo%`, `?id=in.(1,2,3)`. +const ALLOWED_OPS = new Set([ + "eq", + "neq", + "gt", + "gte", + "lt", + "lte", + "like", + "ilike", + "in", + "is", +]); + +interface RouteGeneratorOptions { + schema: Schema; + config: IDatabaseConfig; + /** + * DatabasePlugin owns identity selection. The route generator only says which + * access mode was configured for the verb and receives the right entity map. + */ + getSurface: ( + req: express.Request, + access: HttpAccess, + ) => DatabaseExecutionSurface; + /** + * Service-principal pool used for the `_healthz` `SELECT 1` probe. Optional + * because some tests exercise the route generator without a real pool. + */ + getServicePool?: () => import("pg").Pool; + /** Bound wrapper around Plugin#route so endpoint registration stays central. */ + route: (router: IAppRouter, config: RouteConfig) => void; +} + +/** + * Generates the HTTP layer for every schema table. + * + * This class deliberately does not know about PostGREST clients, pg pools, or + * auth internals. It translates Express requests into the L3 EntityClient API; + * the entity client then handles validation, hooks, execute wrapping, retries, + * cache, telemetry, and DataPath calls. + */ +export class RouteGenerator { + constructor(private readonly options: RouteGeneratorOptions) {} + + injectAll(router: IAppRouter): void { + this.bindHealth(router); + for (const [name, table] of Object.entries(this.options.schema.$tables)) { + this.injectEntity(router, name, table); + } + } + + /** + * Mount `GET /api/database/_healthz`. Runs a `SELECT 1` against the SP + * pool and returns `{ ok, poolStats }` so a load balancer can wait for the + * database side of the plugin to come up before routing traffic. + * + * The route is always public — readiness checks come from k8s/LB + * components that don't carry user auth headers. + */ + private bindHealth(router: IAppRouter): void { + if (this.options.config.healthCheck === false) return; + const getPool = this.options.getServicePool; + if (!getPool) return; + this.options.route(router, { + name: "database._healthz", + method: "get", + path: "/_healthz", + handler: async (_req, res) => { + try { + const pool = getPool(); + await pool.query("SELECT 1"); + res.json({ + ok: true, + poolStats: { + total: pool.totalCount, + idle: pool.idleCount, + waiting: pool.waitingCount, + }, + }); + } catch (error) { + logger.warn("database health check failed: %O", error); + res.status(503).json({ ok: false }); + } + }, + }); + } + + private injectEntity( + router: IAppRouter, + name: string, + table: AppKitTable, + ): void { + const access = resolveAccess(this.options.config.http?.[name]); + // Private columns are excluded from the HTTP-addressable surface entirely: + // not filterable, not selectable. The entity client enforces the same + // policy for default reads; this set protects the parsing layer. + const cols = new Set( + Object.entries(table.$columns) + .filter(([, meta]) => meta.private !== true) + .map(([colName]) => colName), + ); + const kinds = buildColumnKindMap(table, cols); + const pkColumn = derivePkColumnName(table); + const pkKind = pkColumn ? (kinds.get(pkColumn) ?? "unknown") : "unknown"; + + // Six conventional routes per entity. A verb set to `false` is skipped + // entirely so disabled endpoints are not present in Express at all. + if (access.list !== false) + this.bindList(router, name, cols, kinds, access.list); + if (access.count !== false) + this.bindCount(router, name, cols, kinds, access.count); + if (access.find !== false) + this.bindFind(router, name, cols, kinds, pkKind, access.find); + if (access.create !== false) this.bindCreate(router, name, access.create); + if (access.update !== false) + this.bindUpdate(router, name, pkKind, access.update); + if (access.delete !== false) + this.bindDelete(router, name, pkKind, access.delete); + } + private bindList( + router: IAppRouter, + name: string, + cols: ReadonlySet, + kinds: ReadonlyMap, + access: HttpAccess, + ): void { + this.bind(router, name, "list", "get", `/${name}`, async (req, res) => { + let q = this.entity(req, access, name); + q = applyFilters(q, req.query, cols, kinds); + q = applySelect(q, req.query.select, cols); + q = applyInclude( + q, + req.query.include, + this.options.schema, + this.options.config, + ); + if (typeof req.query.order === "string") { + q = applyOrder(q, req.query.order, cols); + } + q = q.limit( + typeof req.query.limit === "string" + ? clampLimit(Number(req.query.limit)) + : DEFAULT_LIMIT, + ); + if (typeof req.query.offset === "string") { + const offset = Number(req.query.offset); + if (Number.isFinite(offset) && offset >= 0) { + q = q.offset(offset); + } + } + res.json(await q.toArray()); + }); + } + private bindCount( + router: IAppRouter, + name: string, + cols: ReadonlySet, + kinds: ReadonlyMap, + access: HttpAccess, + ): void { + this.bind( + router, + name, + "count", + "get", + `/${name}/count`, + async (req, res) => { + // Count supports the same column filters as list, but intentionally + // ignores pagination and select/order shape controls. + const q = applyFilters( + this.entity(req, access, name), + req.query, + cols, + kinds, + ); + res.json({ count: await q.count() }); + }, + ); + } + private bindFind( + router: IAppRouter, + name: string, + cols: ReadonlySet, + _kinds: ReadonlyMap, + pkKind: ColumnKind, + access: HttpAccess, + ): void { + this.bind(router, name, "find", "get", `/${name}/:id`, async (req, res) => { + let q = this.entity(req, access, name); + q = applySelect(q, req.query.select, cols); + q = applyInclude( + q, + req.query.include, + this.options.schema, + this.options.config, + ); + const row = await q.find(coerceId(req.params.id, pkKind)); + if (!row) { + res.status(404).json({ error: `${name} not found` }); + return; + } + res.json(row); + }); + } + private bindCreate( + router: IAppRouter, + name: string, + access: HttpAccess, + ): void { + this.bind(router, name, "create", "post", `/${name}`, async (req, res) => { + // PostgREST-compatible upsert: a POST carrying `Prefer: + // resolution=merge-duplicates` plus `?on_conflict=` is treated as + // INSERT ... ON CONFLICT DO UPDATE. Lets the browser client share one + // verb (POST) for both create and upsert. + const prefer = String(req.header("prefer") ?? "").toLowerCase(); + const onConflict = req.query.on_conflict; + if ( + prefer.includes("resolution=merge-duplicates") && + typeof onConflict === "string" && + onConflict + ) { + const row = await this.entity(req, access, name).upsert( + req.body as Record, + { onConflict }, + ); + res.status(200).json(row); + return; + } + + const row = await this.entity(req, access, name).create( + req.body as Record, + ); + res.status(201).json(row); + }); + } + private bindUpdate( + router: IAppRouter, + name: string, + pkKind: ColumnKind, + access: HttpAccess, + ): void { + this.bind( + router, + name, + "update", + "patch", + `/${name}/:id`, + async (req, res) => { + const row = await this.entity(req, access, name).update( + coerceId(req.params.id, pkKind), + req.body as Partial>, + ); + res.json(row); + }, + ); + } + private bindDelete( + router: IAppRouter, + name: string, + pkKind: ColumnKind, + access: HttpAccess, + ): void { + this.bind( + router, + name, + "delete", + "delete", + `/${name}/:id`, + async (req, res) => { + await this.entity(req, access, name).delete( + coerceId(req.params.id, pkKind), + ); + res.status(204).end(); + }, + ); + } + private entity( + req: express.Request, + access: HttpAccess, + name: string, + ): EntityClient { + // `public` and `service` both resolve to the SP entity surface today. The + // distinction is still kept in config so future policy/logging can treat + // them differently without changing route registration. + const entity = this.options.getSurface(req, access)[name]; + if (!entity) { + throw new Error(`Database entity "${name}" is not available`); + } + return entity; + } + private bind( + router: IAppRouter, + entity: string, + verb: Verb, + method: "get" | "post" | "patch" | "delete", + path: string, + handler: (req: express.Request, res: express.Response) => Promise, + ): void { + // Central route wrapper: Plugin#route handles endpoint registration, while + // this wrapper keeps generated handlers from leaking raw exceptions. + this.options.route(router, { + name: `${entity}.${verb}`, + method, + path, + handler: async (req, res) => { + try { + await handler(req, res); + } catch (error) { + logger.error("database route %s %s failed: %O", method, path, error); + if (error instanceof ZodError) { + res.status(400).json({ errors: error.format() }); + return; + } + // AppKitError messages are author-controlled (404/409/etc) and safe. + // DatabaseRouteError carries the status from Plugin#execute (already + // scrubbed for prod). Anything else is a raw thrown Error — show its + // message in dev, scrub to "Server error" in prod to avoid leaking + // stack/internal hints. + if (error instanceof AppKitError) { + res.status(error.statusCode).json({ error: error.message }); + return; + } + if (error instanceof DatabaseRouteError) { + res.status(error.statusCode).json({ error: error.message }); + return; + } + const fallback = + process.env.NODE_ENV === "production" + ? "Server error" + : error instanceof Error + ? error.message + : "Server error"; + res.status(500).json({ error: fallback }); + } + }, + }); + } +} +function resolveAccess( + override?: HttpEntityOverride, +): Record { + // Missing config is intentionally not "public". App authors must opt into + // non-OBO HTTP exposure verb by verb. + return { ...DEFAULT_ACCESS, ...override }; +} + +function applyFilters( + q: EntityClient, + query: express.Request["query"], + cols: ReadonlySet, + kinds: ReadonlyMap, +): EntityClient { + let next = q; + + // Query params follow `column=operator.value`. Params for undeclared columns + // are ignored rather than forwarded, which avoids accidentally exposing + // hidden columns as filterable HTTP surface. + for (const [key, raw] of Object.entries(query)) { + if (RESERVED_QUERY_KEYS.has(key) || !cols.has(key)) continue; + const kind = kinds.get(key) ?? "unknown"; + const values = Array.isArray(raw) ? raw : [raw]; + const decoded: unknown[] = []; + for (const v of values) { + const value = String(v); + const dot = value.indexOf("."); + if (dot < 0) { + decoded.push(coerceScalarTyped(value, kind)); + continue; + } + const op = value.slice(0, dot); + if (!ALLOWED_OPS.has(op)) continue; + decoded.push({ [op]: coerceFilterValue(op, value.slice(dot + 1), kind) }); + } + if (decoded.length === 0) continue; + if (decoded.length === 1) { + const only = decoded[0]; + next = next.where({ + [key]: + typeof only === "object" && only !== null && !Array.isArray(only) + ? (only as Record) + : (only as unknown), + } as WhereInput>); + continue; + } + // Multiple values for the same key: prefer to AND them together by merging + // every decoded predicate (`?col=eq.a&col=neq.b` means both). When every + // entry is a bare scalar, treat it as `IN (...)` for the natural duplicate + // shape used by HTML forms (`col=a&col=b`). + const allScalars = decoded.every( + (d) => typeof d !== "object" || d === null || Array.isArray(d), + ); + if (allScalars) { + next = next.where({ + [key]: { in: decoded }, + } as WhereInput>); + continue; + } + const merged: Record = {}; + for (const entry of decoded) { + if ( + typeof entry === "object" && + entry !== null && + !Array.isArray(entry) + ) { + Object.assign(merged, entry); + } + } + next = next.where({ [key]: merged } as WhereInput>); + } + return next; +} +/** + * Validate `?select=col1,col2` against the schema's columns and project. + * Unknown columns are dropped silently — same posture as `applyFilters` so + * undeclared columns never become HTTP-addressable. + */ +function applySelect( + q: EntityClient, + raw: unknown, + cols: ReadonlySet, +): EntityClient { + if (typeof raw !== "string" || raw.length === 0) return q; + const picked = raw + .split(",") + .map((c) => c.trim()) + .filter((c) => cols.has(c)); + return picked.length > 0 ? q.select(...picked) : q; +} + +/** + * Parse `?include=posts,author` (or `?include=posts(id,title),author(name)`) + * and forward to `entity.include({ ... })`. The runtime resolves relation + * names against the schema's `$relations` metadata; unknown names throw at + * query time, so this parser intentionally trusts the caller. + */ +function applyInclude( + q: EntityClient, + raw: unknown, + schema: Schema, + config: IDatabaseConfig, +): EntityClient { + if (typeof raw !== "string" || raw.length === 0) return q; + const include = parseIncludeSpec(raw); + + // Strip select columns that don't exist on the related table or are + // private. Keeps `?include=author(password_hash)` from leaking secrets. + // Unknown relation names are passed through — the runtime layer is + // authoritative about what relations exist and rejects them at query time. + for (const [relation, spec] of Object.entries(include)) { + if (spec === true) continue; + const relatedTable = schema.$tables[relation]; + if (!relatedTable) continue; + const allow = new Set( + Object.entries(relatedTable.$columns) + .filter( + ([, meta]) => + meta.private !== true && config.http?.[relation]?.list !== false, + ) + .map(([colName]) => colName), + ); + spec.select = spec.select.filter((c) => allow.has(c)); + } + + return Object.keys(include).length > 0 + ? (q.include(include) as EntityClient) + : q; +} + +/** + * Tokenise an `?include=` value into `{ relation: true | { select: [...] } }`. + * Splits on top-level commas (paren-aware) and parses each `name(col,col)` + * fragment. Whitespace is trimmed everywhere; empty fragments are skipped. + */ +function parseIncludeSpec( + raw: string, +): Record { + const out: Record = {}; + let depth = 0; + let buf = ""; + const fragments: string[] = []; + for (const ch of raw) { + if (ch === "(") depth++; + if (ch === ")") depth--; + if (ch === "," && depth === 0) { + fragments.push(buf); + buf = ""; + continue; + } + buf += ch; + } + if (buf) fragments.push(buf); + + for (const fragment of fragments) { + const trimmed = fragment.trim(); + if (!trimmed) continue; + const open = trimmed.indexOf("("); + if (open < 0) { + out[trimmed] = true; + continue; + } + const close = trimmed.lastIndexOf(")"); + const relation = trimmed.slice(0, open).trim(); + if (!relation) continue; + const inner = close > open ? trimmed.slice(open + 1, close) : ""; + const select = inner + .split(",") + .map((c) => c.trim()) + .filter(Boolean); + out[relation] = select.length > 0 ? { select } : true; + } + return out; +} + +function applyOrder( + q: EntityClient, + raw: string, + cols: ReadonlySet, +): EntityClient { + const order: Record = {}; + + // PostGREST-style order list: `?order=email.desc,createdAt.asc`. + // Unknown columns are skipped for the same reason filters are constrained. + for (const clause of raw.split(",")) { + const [column, direction] = clause.split("."); + if (!cols.has(column)) continue; + order[column] = direction === "desc" ? "desc" : "asc"; + } + return Object.keys(order).length ? q.order(order) : q; +} +function clampLimit(value: number): number { + // Hard clamp keeps accidental large browser reads from turning into expensive + // table scans. Callers that need more should page explicitly. + if (!Number.isFinite(value) || value <= 0) return DEFAULT_LIMIT; + return Math.min(MAX_LIMIT, Math.floor(value)); +} +function coerceFilterValue( + op: string, + value: string, + kind: ColumnKind, +): unknown { + if (op === "in") { + // Support both `in.(a,b)` and `in.a,b` shapes; the former matches PostGREST + // URLs while the latter is a little easier to type by hand. + const body = + value.startsWith("(") && value.endsWith(")") ? value.slice(1, -1) : value; + return splitList(body).map((part) => coerceScalarTyped(part, kind)); + } + if (op === "like" || op === "ilike") return value; + return coerceScalarTyped(value, kind); +} +/** + * Type-aware scalar coercion for filter values pulled out of the query string. + * + * Text/uuid/json columns get the raw string back so a value like `"true"`, + * `"null"`, or `"42"` is filtered as the literal string the user typed. + * Number/boolean/date columns get the same heuristic as before so + * `?count=eq.42` still works. + */ +function coerceScalarTyped(value: string, kind: ColumnKind): unknown { + if (value.length >= 2 && value.startsWith('"') && value.endsWith('"')) { + return value.slice(1, -1).replace(/\\"/g, '"'); + } + if (kind === "text" || kind === "uuid" || kind === "json") return value; + if (value === "true") return true; + if (value === "false") return false; + if (value === "null") return null; + if ( + (kind === "number" || kind === "unknown") && + value !== "" && + !Number.isNaN(Number(value)) + ) { + return Number(value); + } + return value; +} +function coerceScalar(value: string): unknown { + // Kept for callers that have no column-kind context. Behaves like the old + // heuristic — prefer `coerceScalarTyped(value, kind)` when the column kind + // is available so we don't reinterpret strings on text columns. + return coerceScalarTyped(value, "unknown"); +} +function splitList(value: string): string[] { + const out: string[] = []; + let buf = ""; + let inQuotes = false; + let escaped = false; + + for (const ch of value) { + if (escaped) { + buf += ch; + escaped = false; + continue; + } + if (ch === "\\" && inQuotes) { + buf += ch; + escaped = true; + continue; + } + if (ch === '"') inQuotes = !inQuotes; + if (ch === "," && !inQuotes) { + out.push(buf); + buf = ""; + continue; + } + buf += ch; + } + + out.push(buf); + return out; +} +function coerceId(raw: string, kind: ColumnKind): string | number { + // Honor the declared PK type. Text/uuid PKs that happen to look numeric + // (`"123"`) used to be silently turned into numbers — now they pass through. + if (kind === "text" || kind === "uuid") return raw; + const numberValue = Number(raw); + return Number.isFinite(numberValue) && String(numberValue) === raw + ? numberValue + : raw; +} + +function derivePkColumnName(table: AppKitTable): string | null { + for (const [name, meta] of Object.entries(table.$columns)) { + if (meta.primaryKey) return name; + } + return Object.keys(table.$columns).includes("id") ? "id" : null; +} diff --git a/packages/appkit/src/plugins/database/tests/entity-proxy.test.ts b/packages/appkit/src/plugins/database/tests/entity-proxy.test.ts new file mode 100644 index 00000000..956d4485 --- /dev/null +++ b/packages/appkit/src/plugins/database/tests/entity-proxy.test.ts @@ -0,0 +1,358 @@ +import { describe, expect, test, vi } from "vitest"; +import type { DataPath } from "@/database"; +import { defineSchema, id, text } from "../../../database"; +import { type ExecutorFn, makeEntityClient } from "../entity-proxy"; + +const schema = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + role: text().default("member"), + }), +})); + +function makeExecutor() { + return vi.fn(async (fn: (signal?: AbortSignal) => Promise) => + fn(new AbortController().signal), + ) as unknown as ExecutorFn & ReturnType; +} + +/** + * Build a fake `DataPath` whose every method records its call args. Each + * method returns the supplied fixture row (or rows). This is much smaller + * than the previous postgrest-builder mock because the surface is flat — no + * chain to fake. + */ +function fakeDataPath( + rows: Record[] = [{ id: 1, email: "a@x" }], +) { + const calls: Array<{ method: keyof DataPath; args: unknown[] }> = []; + const path: DataPath = { + select: vi.fn(async (...args) => { + calls.push({ method: "select", args }); + return rows; + }), + findOne: vi.fn(async (...args) => { + calls.push({ method: "findOne", args }); + return rows[0] ?? null; + }), + count: vi.fn(async (...args) => { + calls.push({ method: "count", args }); + return rows.length; + }), + insert: vi.fn(async (...args) => { + calls.push({ method: "insert", args }); + return rows[0] ?? { id: 1 }; + }), + update: vi.fn(async (...args) => { + calls.push({ method: "update", args }); + return rows[0] ?? { id: 1 }; + }), + upsert: vi.fn(async (...args) => { + calls.push({ method: "upsert", args }); + return rows[0] ?? { id: 1 }; + }), + delete: vi.fn(async (...args) => { + calls.push({ method: "delete", args }); + }), + transaction: vi.fn(async (fn) => fn(path)), + raw: vi.fn(async () => rows as never[]), + }; + return { path, calls }; +} + +function makeClient( + execute = makeExecutor(), + rows = [{ id: 1, email: "a@x" }], +) { + const dataPath = fakeDataPath(rows); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: dataPath.path, + execute, + makeUserDataPath: () => dataPath.path, + hookContext: () => ({ entity: "user" }), + }); + return { client, execute, dataPath }; +} + +describe("EntityClient", () => { + test("forwards filters/order/pagination/include into DataPath.select", async () => { + const { client, dataPath } = makeClient(); + + await client + .where({ role: "admin", id: { gte: 1 } }) + .order({ email: "desc" }) + .include({ posts: { select: ["id", "title"], limit: 5 } }) + .limit(10) + .offset(20) + .toArray(); + + expect(dataPath.path.select).toHaveBeenCalledTimes(1); + const args = dataPath.calls[0].args[1] as Record; + expect(args.where).toEqual({ role: "admin", id: { gte: 1 } }); + expect(args.order).toEqual({ email: "desc" }); + expect(args.limit).toBe(10); + expect(args.offset).toBe(20); + expect(args.include).toEqual({ + posts: { select: ["id", "title"], limit: 5 }, + }); + }); + + test("clamps read limits at the server-side maximum", async () => { + const { client, dataPath } = makeClient(); + + await client.limit(10_000).toArray(); + + const args = dataPath.calls[0].args[1] as Record; + expect(args.limit).toBe(500); + }); + + test("toArray() applies MAX_LIMIT when no limit is set", async () => { + const { client, dataPath } = makeClient(); + + await client.toArray(); + + const args = dataPath.calls[0].args[1] as Record; + expect(args.limit).toBe(500); + }); + + test("unbounded() removes the default cap on toArray()", async () => { + const { client, dataPath } = makeClient(); + + await client.unbounded().toArray(); + + const args = dataPath.calls[0].args[1] as Record; + expect(args.limit).toBeUndefined(); + }); + + test("private columns are excluded from default reads and from select()", async () => { + const sensitive = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + passwordHash: text().notNull().private(), + }), + })); + const dataPath = fakeDataPath([ + { id: 1, email: "a@x", passwordHash: "secret" }, + ]); + const client = makeEntityClient({ + table: sensitive.user, + entity: "user", + dataPath: dataPath.path, + execute: makeExecutor(), + makeUserDataPath: () => dataPath.path, + hookContext: () => ({ entity: "user" }), + }); + + await client.toArray(); + const defaultArgs = dataPath.calls[0].args[1] as Record; + expect(defaultArgs.columns).toEqual(["id", "email"]); + + dataPath.calls.length = 0; + await client + .select("id" as never, "email" as never, "passwordHash" as never) + .toArray(); + const selectedArgs = dataPath.calls[0].args[1] as Record; + expect(selectedArgs.columns).toEqual(["id", "email"]); + }); + + test("every terminator runs through the bound executor", async () => { + const cases: Array< + ( + client: ReturnType>>, + ) => Promise + > = [ + (client) => client.toArray(), + (client) => client.first(), + (client) => client.find(1), + (client) => client.count(), + (client) => client.create({ email: "a@x" }), + (client) => client.update(1, { role: "admin" }), + (client) => client.upsert({ email: "b@x" }, { onConflict: "email" }), + (client) => client.delete(1), + ]; + + for (const run of cases) { + const execute = makeExecutor(); + const { client } = makeClient(execute); + + await run(client); + + expect(execute).toHaveBeenCalledTimes(1); + } + }); + + test("create and update hooks can rewrite payloads before validation", async () => { + const beforeCreate = vi.fn(async (data: Record) => ({ + ...data, + role: "admin", + })); + const beforeUpdate = vi.fn( + async (_id: unknown, data: Record) => ({ + ...data, + role: "owner", + }), + ); + const dataPath = fakeDataPath([{ id: 1, email: "a@x", role: "admin" }]); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: dataPath.path, + execute: makeExecutor(), + makeUserDataPath: () => dataPath.path, + hooks: { beforeCreate, beforeUpdate }, + hookContext: () => ({ entity: "user" }), + }); + + await client.create({ email: "a@x" }); + await client.update(1, { email: "b@x" }); + + expect(beforeCreate).toHaveBeenCalled(); + expect(beforeUpdate).toHaveBeenCalled(); + expect(dataPath.path.insert).toHaveBeenCalledWith( + schema.user, + expect.objectContaining({ role: "admin" }), + expect.anything(), + ); + expect(dataPath.path.update).toHaveBeenCalledWith( + schema.user, + "id", + 1, + expect.objectContaining({ role: "owner" }), + expect.anything(), + ); + }); + + test("afterCreate, afterUpdate, before/afterDelete are awaited and passed the row", async () => { + const afterCreate = vi.fn(async () => undefined); + const afterUpdate = vi.fn(async () => undefined); + const beforeDelete = vi.fn(async () => undefined); + const afterDelete = vi.fn(async () => undefined); + const dataPath = fakeDataPath([{ id: 1, email: "a@x", role: "member" }]); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: dataPath.path, + execute: makeExecutor(), + makeUserDataPath: () => dataPath.path, + hooks: { afterCreate, afterUpdate, beforeDelete, afterDelete }, + hookContext: () => ({ entity: "user" }), + }); + + await client.create({ email: "a@x" }); + await client.update(1, { email: "b@x" }); + await client.delete(1); + + expect(afterCreate).toHaveBeenCalledWith( + expect.objectContaining({ id: 1, email: "a@x" }), + { entity: "user" }, + ); + expect(afterUpdate).toHaveBeenCalledWith( + expect.objectContaining({ id: 1 }), + { entity: "user" }, + ); + expect(beforeDelete).toHaveBeenCalledWith(1, { entity: "user" }); + expect(afterDelete).toHaveBeenCalledWith(1, { entity: "user" }); + }); + + test("upsert does NOT invoke beforeCreate/beforeUpdate (separate channel)", async () => { + const beforeCreate = vi.fn(async () => undefined); + const beforeUpdate = vi.fn(async () => undefined); + const beforeUpsert = vi.fn(async () => undefined); + const dataPath = fakeDataPath([{ id: 1, email: "a@x", role: "admin" }]); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: dataPath.path, + execute: makeExecutor(), + makeUserDataPath: () => dataPath.path, + hooks: { beforeCreate, beforeUpdate, beforeUpsert }, + hookContext: () => ({ entity: "user" }), + }); + + await client.upsert({ email: "a@x" }, { onConflict: "email" }); + + expect(beforeUpsert).toHaveBeenCalled(); + expect(beforeCreate).not.toHaveBeenCalled(); + expect(beforeUpdate).not.toHaveBeenCalled(); + }); + + test("upsert hooks can rewrite payloads before validation", async () => { + const beforeUpsert = vi.fn(async (data: Record) => ({ + ...data, + role: "admin", + })); + const afterUpsert = vi.fn(async () => undefined); + const dataPath = fakeDataPath([{ id: 1, email: "a@x", role: "admin" }]); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: dataPath.path, + execute: makeExecutor(), + makeUserDataPath: () => dataPath.path, + hooks: { beforeUpsert, afterUpsert }, + hookContext: () => ({ entity: "user" }), + }); + + await client.upsert({ email: "a@x" }, { onConflict: "email" }); + + expect(beforeUpsert).toHaveBeenCalled(); + expect(afterUpsert).toHaveBeenCalledWith( + expect.objectContaining({ role: "admin" }), + { entity: "user" }, + ); + expect(dataPath.path.upsert).toHaveBeenCalledWith( + schema.user, + expect.objectContaining({ role: "admin" }), + { onConflict: "email" }, + expect.anything(), + ); + }); + + test("asUser builds a fresh user-mode DataPath when the OBO header is present", async () => { + const serviceDataPath = fakeDataPath([]); + const userDataPath = fakeDataPath([{ id: 1, email: "u@x" }]); + const makeUserDataPath = vi.fn(() => userDataPath.path); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: serviceDataPath.path, + execute: makeExecutor(), + makeUserDataPath, + hookContext: () => ({ entity: "user" }), + }); + const req = { + header: vi.fn((name: string) => + name === "x-forwarded-email" ? "user@example.com" : undefined, + ), + } as unknown as import("express").Request; + + await client.asUser(req).toArray(); + + expect(makeUserDataPath).toHaveBeenCalledWith(req); + expect(userDataPath.path.select).toHaveBeenCalled(); + expect(serviceDataPath.path.select).not.toHaveBeenCalled(); + }); + + test("asUser falls through to self when no per-user DataPath is available", async () => { + const serviceDataPath = fakeDataPath([{ id: 1, email: "x@x" }]); + const client = makeEntityClient({ + table: schema.user, + entity: "user", + dataPath: serviceDataPath.path, + execute: makeExecutor(), + makeUserDataPath: () => null, + hookContext: () => ({ entity: "user" }), + }); + const req = { + header: vi.fn(() => undefined), + } as unknown as import("express").Request; + + await client.asUser(req).toArray(); + + expect(serviceDataPath.path.select).toHaveBeenCalled(); + }); +}); diff --git a/packages/appkit/src/plugins/database/tests/plugin.test.ts b/packages/appkit/src/plugins/database/tests/plugin.test.ts index 6f67359f..b9206509 100644 --- a/packages/appkit/src/plugins/database/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/database/tests/plugin.test.ts @@ -1,14 +1,55 @@ import type { Pool } from "pg"; import { beforeEach, describe, expect, test, vi } from "vitest"; import { createLakebasePool } from "../../../connectors/lakebase"; -import { defineSchema, id } from "../../../database"; +import { defineSchema, id, text } from "../../../database"; import { loadSchemaByConvention } from "../convention"; import { database } from "../database"; vi.mock("../../../connectors/lakebase", () => ({ createLakebasePool: vi.fn(), + createLakebasePostgrestClient: vi.fn(), })); +vi.mock("../../../database", async () => { + const actual = + await vi.importActual( + "../../../database", + ); + return { + ...actual, + // The runtime is exercised by entity-proxy tests with a fake DataPath; + // here we only care that the plugin wires *something* per entity, so we + // stub the runtime to avoid initialising drizzle + node-postgres. + createDrizzleDataPath: vi.fn(() => ({ + select: vi.fn(async () => []), + findOne: vi.fn(async () => null), + count: vi.fn(async () => 0), + insert: vi.fn(async () => ({})), + update: vi.fn(async () => ({})), + upsert: vi.fn(async () => ({})), + delete: vi.fn(async () => undefined), + transaction: vi.fn(async (fn: (tx: unknown) => Promise) => + fn({}), + ), + raw: vi.fn(async () => []), + })), + }; +}); + +vi.mock("../../../context/service-context", async () => { + const actual = await vi.importActual< + typeof import("../../../context/service-context") + >("../../../context/service-context"); + return { + ...actual, + ServiceContext: { + ...actual.ServiceContext, + createUserContext: vi.fn(() => ({})), + getClientOptions: vi.fn(() => ({})), + }, + }; +}); + vi.mock("../../../cache", () => ({ CacheManager: { getInstanceSync: vi.fn(() => ({ @@ -68,7 +109,11 @@ describe("DatabasePlugin", () => { connectionTimeoutMillis: 3_000, maxUses: 1000, }); - expect(plugin.exports()).toEqual({ getPool: expect.any(Function) }); + expect(plugin.exports()).toMatchObject({ + getPool: expect.any(Function), + transaction: expect.any(Function), + sql: expect.any(Function), + }); expect((plugin.exports() as { getPool: () => Pool }).getPool()).toBe(pool); }); @@ -94,6 +139,214 @@ describe("DatabasePlugin", () => { ).toBe("/app/config/database/schema.ts"); }); + test("wires one entity client per schema table on the SP pool", async () => { + const schema = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin(); + await plugin.setup(); + + const exports = plugin.exports() as unknown as { + getPool: () => Pool; + user: unknown; + }; + expect(exports.getPool()).toBe(pool); + expect(exports.user).toBeDefined(); + // The wiring goes through the SP pool; no Data API URL is required. + expect(createLakebasePool).toHaveBeenCalled(); + }); + + test("entity asUser creates user-scoped pools and evicts by LRU limit", async () => { + const originalHost = process.env.DATABRICKS_HOST; + process.env.DATABRICKS_HOST = "https://example.cloud.databricks.com"; + const servicePool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + const userOnePool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + const userTwoPool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + vi.mocked(createLakebasePool) + .mockReturnValueOnce(servicePool) + .mockReturnValueOnce(userOnePool) + .mockReturnValueOnce(userTwoPool); + const schema = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin({ + oboPoolMax: 1, + }); + await plugin.setup(); + const exports = plugin.exports() as unknown as { + user: { + asUser: (req: import("express").Request) => unknown; + }; + }; + const reqFor = (email: string, token: string) => + ({ + header: vi.fn((name: string) => { + if (name === "x-forwarded-email") return email; + if (name === "x-forwarded-access-token") return token; + return undefined; + }), + }) as unknown as import("express").Request; + + exports.user.asUser(reqFor("one@example.com", "tok-1")); + exports.user.asUser(reqFor("two@example.com", "tok-2")); + + expect(createLakebasePool).toHaveBeenCalledTimes(3); + expect(createLakebasePool).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + user: "one@example.com", + workspaceClient: expect.any(Object), + }), + ); + expect(userOnePool.end).toHaveBeenCalled(); + expect(userTwoPool.end).not.toHaveBeenCalled(); + if (originalHost === undefined) { + delete process.env.DATABRICKS_HOST; + } else { + process.env.DATABRICKS_HOST = originalHost; + } + }); + + test("asUser routes getPool/transaction/sql through the user pool, not SP", async () => { + const originalHost = process.env.DATABRICKS_HOST; + process.env.DATABRICKS_HOST = "https://example.cloud.databricks.com"; + const servicePool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + const userPool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + vi.mocked(createLakebasePool) + .mockReturnValueOnce(servicePool) + .mockReturnValueOnce(userPool); + const schema = defineSchema(({ table }) => ({ + user: table("user", { id: id(), email: text().notNull() }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin(); + await plugin.setup(); + + const req = { + header: vi.fn((name: string) => { + if (name === "x-forwarded-email") return "alice@example.com"; + if (name === "x-forwarded-access-token") return "tok-alice"; + if (name === "x-forwarded-user") return "alice@example.com"; + return undefined; + }), + } as unknown as import("express").Request; + + const userExports = ( + plugin.asUser(req).exports() as unknown as { getPool: () => Pool } + ).getPool(); + + expect(userExports).toBe(userPool); + expect(userExports).not.toBe(servicePool); + if (originalHost === undefined) { + delete process.env.DATABRICKS_HOST; + } else { + process.env.DATABRICKS_HOST = originalHost; + } + }); + + test("exports transaction(fn) and sql`` backed by the runtime data path", async () => { + const schema = defineSchema(({ table }) => ({ + user: table("user", { id: id(), email: text().notNull() }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin(); + await plugin.setup(); + + const exports = plugin.exports() as unknown as { + transaction: (fn: (tx: unknown) => Promise) => Promise; + sql: ( + strings: TemplateStringsArray, + ...values: unknown[] + ) => Promise; + }; + expect(typeof exports.transaction).toBe("function"); + expect(typeof exports.sql).toBe("function"); + + // Round-trip through the stub DataPath wired in createDrizzleDataPath mock + // (top of this file). Both should resolve via the stubbed methods. + await expect(exports.transaction(async () => 42)).resolves.toBe(42); + await expect(exports.sql`select 1`).resolves.toEqual([]); + }); + + test("injectRoutes registers entity routes once schema is loaded", async () => { + const schema = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin(); + await plugin.setup(); + + const router = { + get: vi.fn(), + post: vi.fn(), + patch: vi.fn(), + delete: vi.fn(), + }; + plugin.injectRoutes(router as never); + + expect(router.get).toHaveBeenCalledWith("/user", expect.any(Function)); + expect(router.get).toHaveBeenCalledWith( + "/user/count", + expect.any(Function), + ); + expect(router.get).toHaveBeenCalledWith("/user/:id", expect.any(Function)); + expect(router.post).toHaveBeenCalledWith("/user", expect.any(Function)); + expect(router.patch).toHaveBeenCalledWith( + "/user/:id", + expect.any(Function), + ); + expect(router.delete).toHaveBeenCalledWith( + "/user/:id", + expect.any(Function), + ); + }); + test("closes the pool during shutdown", async () => { const plugin = createPlugin(); await plugin.setup(); @@ -166,4 +419,125 @@ describe("DatabasePlugin", () => { const plugin = createPlugin({ tolerateSetupFailure: true }); await expect(plugin.setup()).resolves.toBeUndefined(); }); + + test("per-user pools set app.user_id on every new connection", async () => { + const originalHost = process.env.DATABRICKS_HOST; + process.env.DATABRICKS_HOST = "https://example.cloud.databricks.com"; + const servicePool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + const userPool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + vi.mocked(createLakebasePool) + .mockReturnValueOnce(servicePool) + .mockReturnValueOnce(userPool); + const schema = defineSchema(({ table }) => ({ + user: table("user", { id: id(), email: text().notNull() }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin(); + await plugin.setup(); + const exports = plugin.exports() as unknown as { + user: { asUser: (req: import("express").Request) => unknown }; + }; + const req = { + header: vi.fn((name: string) => { + if (name === "x-forwarded-email") return "alice@example.com"; + if (name === "x-forwarded-access-token") return "tok-alice"; + return undefined; + }), + } as unknown as import("express").Request; + exports.user.asUser(req); + + expect(userPool.on).toHaveBeenCalledWith("connect", expect.any(Function)); + const handler = vi + .mocked(userPool.on) + .mock.calls.find( + ([event]) => event === "connect", + )?.[1] as unknown as (client: { + query: ReturnType; + }) => void; + const client = { query: vi.fn(async () => ({})) }; + handler(client); + expect(client.query).toHaveBeenCalledWith( + "SELECT set_config('app.user_id', $1, false)", + ["alice@example.com"], + ); + + if (originalHost === undefined) { + delete process.env.DATABRICKS_HOST; + } else { + process.env.DATABRICKS_HOST = originalHost; + } + }); + + test("closeAll drains evicted per-user pools, not just live ones", async () => { + const originalHost = process.env.DATABRICKS_HOST; + process.env.DATABRICKS_HOST = "https://example.cloud.databricks.com"; + const servicePool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + let drainResolve: (() => void) | undefined; + const drainGate = new Promise((resolve) => { + drainResolve = resolve; + }); + const userOnePool = { + end: vi.fn(() => drainGate), + on: vi.fn(), + } as unknown as Pool; + const userTwoPool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + vi.mocked(createLakebasePool) + .mockReturnValueOnce(servicePool) + .mockReturnValueOnce(userOnePool) + .mockReturnValueOnce(userTwoPool); + const schema = defineSchema(({ table }) => ({ + user: table("user", { id: id(), email: text().notNull() }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin({ + oboPoolMax: 1, + }); + await plugin.setup(); + const exports = plugin.exports() as unknown as { + user: { asUser: (req: import("express").Request) => unknown }; + }; + const reqFor = (email: string, token: string) => + ({ + header: vi.fn((name: string) => { + if (name === "x-forwarded-email") return email; + if (name === "x-forwarded-access-token") return token; + return undefined; + }), + }) as unknown as import("express").Request; + + exports.user.asUser(reqFor("one@example.com", "tok-1")); + exports.user.asUser(reqFor("two@example.com", "tok-2")); + + const drained = plugin.abortActiveOperations(); + drainResolve?.(); + await drained; + + expect(userOnePool.end).toHaveBeenCalled(); + expect(userTwoPool.end).toHaveBeenCalled(); + if (originalHost === undefined) { + delete process.env.DATABRICKS_HOST; + } else { + process.env.DATABRICKS_HOST = originalHost; + } + }); }); diff --git a/packages/appkit/src/plugins/database/tests/route-generator.test.ts b/packages/appkit/src/plugins/database/tests/route-generator.test.ts new file mode 100644 index 00000000..24c0242d --- /dev/null +++ b/packages/appkit/src/plugins/database/tests/route-generator.test.ts @@ -0,0 +1,172 @@ +import { + createMockRequest, + createMockResponse, + createMockRouter, +} from "@tools/test-helpers"; +import { describe, expect, test, vi } from "vitest"; +import { defineSchema, id, text } from "../../../database"; +import type { EntityClient } from "../entity-proxy"; +import { RouteGenerator } from "../route-generator"; + +const schema = defineSchema(({ table }) => ({ + user: table("user", { + id: id(), + email: text().notNull(), + role: text().default("member"), + }), +})); + +function makeEntity(rows: unknown[] = []) { + const entity = { + where: vi.fn(() => entity), + order: vi.fn(() => entity), + limit: vi.fn(() => entity), + offset: vi.fn(() => entity), + range: vi.fn(() => entity), + select: vi.fn(() => entity), + include: vi.fn(() => entity), + toArray: vi.fn(async () => rows), + first: vi.fn(async () => rows[0] ?? null), + find: vi.fn(async () => rows[0] ?? null), + count: vi.fn(async () => rows.length), + create: vi.fn(async (data: unknown) => data), + update: vi.fn(async (_id: unknown, data: unknown) => data), + upsert: vi.fn(async (data: unknown) => data), + delete: vi.fn(async () => undefined), + asUser: vi.fn(() => entity), + }; + + return entity as unknown as EntityClient & typeof entity; +} + +describe("RouteGenerator", () => { + test("registers six routes per entity with obo access by default", async () => { + const { router, handlers } = createMockRouter(); + const user = makeEntity([]); + const getSurface = vi.fn(() => ({ user })); + new RouteGenerator({ + schema, + config: {}, + getSurface, + route: (target, config) => + target[config.method](config.path, config.handler), + }).injectAll(router); + + expect(Object.keys(handlers).sort()).toEqual([ + "DELETE:/user/:id", + "GET:/user", + "GET:/user/:id", + "GET:/user/count", + "PATCH:/user/:id", + "POST:/user", + ]); + + await handlers["GET:/user"]( + createMockRequest({ query: {} }), + createMockResponse(), + ); + + expect(getSurface).toHaveBeenCalledWith(expect.anything(), "obo"); + expect(user.limit).toHaveBeenCalledWith(50); + }); + + test("applies filters, select projection, order, limit clamp, and include parsing", async () => { + const { router, getHandler } = createMockRouter(); + const user = makeEntity([{ id: 1, email: "a@x", role: "admin" }]); + new RouteGenerator({ + schema, + config: {}, + getSurface: vi.fn(() => ({ user })), + route: (target, config) => + target[config.method](config.path, config.handler), + }).injectAll(router); + + const handler = getHandler("GET", "/user"); + const res = createMockResponse(); + await handler( + createMockRequest({ + query: { + role: "eq.admin", + email: 'eq."Doe, Jane"', + select: "id,email,unknownCol", + include: "posts(id,title),author", + order: "email.desc", + limit: "10000", + offset: "10", + }, + }), + res, + ); + + expect(user.where).toHaveBeenCalledWith({ role: { eq: "admin" } }); + expect(user.where).toHaveBeenCalledWith({ email: { eq: "Doe, Jane" } }); + expect(user.select).toHaveBeenCalledWith("id", "email"); + expect(user.include).toHaveBeenCalledWith({ + posts: { select: ["id", "title"] }, + author: true, + }); + expect(user.order).toHaveBeenCalledWith({ email: "desc" }); + expect(user.limit).toHaveBeenCalledWith(500); + expect(user.offset).toHaveBeenCalledWith(10); + expect(res.json).toHaveBeenCalledWith([ + { id: 1, email: "a@x", role: "admin" }, + ]); + }); + + test("honors disabled verbs and service access overrides", async () => { + const { router, handlers } = createMockRouter(); + const user = makeEntity([]); + const getSurface = vi.fn(() => ({ user })); + new RouteGenerator({ + schema, + config: { + http: { + user: { + count: false, + delete: false, + create: "service", + }, + }, + }, + getSurface, + route: (target, config) => + target[config.method](config.path, config.handler), + }).injectAll(router); + + expect(handlers["GET:/user/count"]).toBeUndefined(); + expect(handlers["DELETE:/user/:id"]).toBeUndefined(); + + await handlers["POST:/user"]( + createMockRequest({ body: { email: "a@x" } }), + createMockResponse(), + ); + + expect(getSurface).toHaveBeenCalledWith(expect.anything(), "service"); + expect(user.create).toHaveBeenCalledWith( + expect.objectContaining({ email: "a@x" }), + ); + }); + + test("returns zod-formatted validation errors from the entity layer", async () => { + const { router, handlers } = createMockRouter(); + const user = makeEntity([]); + user.create.mockImplementation(async () => { + schema.user.$insertSchema.parse({}); + return {}; + }); + new RouteGenerator({ + schema, + config: {}, + getSurface: vi.fn(() => ({ user })), + route: (target, config) => + target[config.method](config.path, config.handler), + }).injectAll(router); + + const res = createMockResponse(); + await handlers["POST:/user"](createMockRequest({ body: {} }), res); + + expect(res.status).toHaveBeenCalledWith(400); + expect(res.json).toHaveBeenCalledWith({ errors: expect.any(Object) }); + expect(user.create).toHaveBeenCalledWith({}); + }); +}); diff --git a/packages/appkit/src/plugins/database/types.ts b/packages/appkit/src/plugins/database/types.ts index a9838a00..d86d977a 100644 --- a/packages/appkit/src/plugins/database/types.ts +++ b/packages/appkit/src/plugins/database/types.ts @@ -1,25 +1,12 @@ import type { BasePluginConfig } from "shared"; -/** - * Pool tuning exposed via `IDatabaseConfig.connection`. - * Intentionally excludes auth fields; Lakebase resolves credentials via OAuth + env. - */ +/** Pool tuning forwarded to `createLakebasePool` (no auth fields). */ export interface DatabasePoolTuning { - /** Maximum number of clients in the pool. */ max?: number; - /** Idle timeout (ms) before closing an idle client. */ idleTimeoutMillis?: number; - /** Connection acquire timeout (ms). */ connectionTimeoutMillis?: number; - /** - * Recycle a client after N uses to reduce stale-token issues. - */ maxUses?: number; - /** - * Statement timeout (ms) set per new connection; top-level setting wins. - */ statement_timeout?: number; - /** Random jitter (ms) added to statement timeout when supported. */ statement_timeout_jitter_ms?: number; } @@ -34,17 +21,17 @@ export type HttpAccess = "public" | "obo" | "service" | false; * @public */ export interface HttpEntityOverride { - /** Access mode for list. */ + /** The HTTP access control for the list operation. */ list?: HttpAccess; - /** Access mode for find. */ + /** The HTTP access control for the find operation. */ find?: HttpAccess; - /** Access mode for count. */ + /** The HTTP access control for the count operation. */ count?: HttpAccess; - /** Access mode for create. */ + /** The HTTP access control for the create operation. */ create?: HttpAccess; - /** Access mode for update. */ + /** The HTTP access control for the update operation. */ update?: HttpAccess; - /** Access mode for delete. */ + /** The HTTP access control for the delete operation. */ delete?: HttpAccess; } @@ -53,43 +40,75 @@ export interface HttpEntityOverride { * @public */ export interface HookContext { - /** Request object. */ + /** The request object. */ req?: import("express").Request; - /** Entity name. */ + /** The entity name. */ entity?: string; - /** User ID. */ + /** + * The forwarded user identity (`x-forwarded-email`). + * + * **Do not use for authorization decisions.** This is a transport-level + * label populated from the same forwarded headers OBO uses; the actual + * authentication signal is the access token on `req`. For authz, read the + * verified user context off `req` — `userId` here is a convenience tag for + * logging, audit fields, or distinguishing cache keys. + */ userId?: string; } +// biome-ignore lint/suspicious/noConfusingVoidType: hooks may return nothing to keep the original payload. +type HookMutationResult = Record | void; + /** * Entity hooks. * @public */ export interface EntityHooks { - /** Runs before create. */ + /** A hook to run before a create operation. */ beforeCreate?: ( data: Record, ctx: HookContext, - ) => Promise | void>; - /** Runs after create. */ + ) => Promise; + /** A hook to run after a create operation. */ afterCreate?: ( row: Record, ctx: HookContext, ) => Promise; - /** Runs before update. */ + /** A hook to run before an update operation. */ beforeUpdate?: ( id: unknown, patch: Record, ctx: HookContext, - ) => Promise | void>; - /** Runs after update. */ + ) => Promise; + /** A hook to run after an update operation. */ afterUpdate?: ( row: Record, ctx: HookContext, ) => Promise; - /** Runs before delete. */ + /** + * Run before an upsert operation. + * + * Note: `upsert` is a separate channel from `create` and `update` — it + * does **not** invoke `beforeCreate`/`beforeUpdate` even when the resolved + * branch is logically an insert or update. If you need shared mutation + * logic, factor it into a helper and call it from both hooks (or from + * `beforeCreate` + `beforeUpdate` + `beforeUpsert`). + */ + beforeUpsert?: ( + data: Record, + ctx: HookContext, + ) => Promise; + /** + * Run after an upsert operation. See `beforeUpsert` — this is **not** a + * fan-out of `afterCreate`/`afterUpdate`. + */ + afterUpsert?: ( + row: Record, + ctx: HookContext, + ) => Promise; + /** A hook to run before a delete operation. */ beforeDelete?: (id: unknown, ctx: HookContext) => Promise; - /** Runs after delete. */ + /** A hook to run after a delete operation. */ afterDelete?: (id: unknown, ctx: HookContext) => Promise; } @@ -98,7 +117,7 @@ export interface EntityHooks { * @public */ export interface CacheActionSettings { - /** Cache TTL in seconds. */ + /** The time to live for the cache in seconds. */ ttl?: number; } @@ -107,11 +126,11 @@ export interface CacheActionSettings { * @public */ export interface CacheSettings { - /** Cache settings for list. */ + /** The cache settings for the list operation. */ list?: CacheActionSettings; - /** Cache settings for find. */ + /** The cache settings for the find operation. */ find?: CacheActionSettings; - /** Cache settings for count. */ + /** The cache settings for the count operation. */ count?: CacheActionSettings; } @@ -120,29 +139,39 @@ export interface CacheSettings { * @public */ export interface IDatabaseConfig extends BasePluginConfig { - /** - * Pool tuning forwarded to `createLakebasePool` (no auth fields). - */ + /** The connection settings for the database. */ connection?: DatabasePoolTuning; - /** Per-entity HTTP access overrides. */ + /** The HTTP entity overrides for the database. */ http?: Record; - /** Per-entity lifecycle hooks. */ + /** The entity hooks for the database. */ hooks?: Record; - /** Per-operation cache settings. */ + /** Whether to check live schema drift during startup. */ + checkDrift?: boolean; + /** + * Mount `GET /api/database/_healthz` for load balancers and readiness probes. + * Defaults to enabled. Set to `false` to suppress the route entirely. + */ + healthCheck?: false; + /** The cache settings for the database. */ cache?: CacheSettings; /** - * Max distinct OBO pools kept alive. Defaults to 25. - * Worst-case fan-out is `(1 + oboPoolMax) × poolMax`. + * Maximum number of distinct per-user (OBO) pools the registry keeps alive + * at once. Each pool defaults to `OBO_POOL_DEFAULTS.max = 4` connections, so + * the worst-case fan-out is `(1 + oboPoolMax) × poolMax`. Defaults to 25 — + * tune up for hot OBO traffic, down for low-tier Lakebase plans. */ oboPoolMax?: number; /** - * Postgres `statement_timeout` (ms) for pooled connections. Defaults to 15s. - * Set `0` to disable server-side timeout (client timeout still applies). + * Postgres `statement_timeout` applied to every pooled connection (ms). + * Defaults to 15s. Set to `0` to disable the server-side cap; the AppKit + * timeout interceptor still applies on the client side. */ statementTimeoutMs?: number; /** - * If true, `setup()` schema/drift failures are logged and ignored. - * Defaults to false (fail closed). + * When true, schema-load and drift-check failures during `setup()` are + * logged but do not throw. Defaults to false (fail closed). Useful in + * environments where the database is provisioned out of band and the boot + * shouldn't crash before the schema is reachable. */ tolerateSetupFailure?: boolean; }