From 6940dd3c30169b130306773cfea9c0db9cecfe95 Mon Sep 17 00:00:00 2001 From: mikhaeelatefrizk <180051136+mikhaeelatefrizk@users.noreply.github.com> Date: Mon, 8 Jun 2026 00:24:48 +0300 Subject: [PATCH 1/4] feat(cron): add removeEvent and optional dataSource on addEvent --- plugins/cron/index.ts | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/plugins/cron/index.ts b/plugins/cron/index.ts index 313ebbb..3c6dbda 100644 --- a/plugins/cron/index.ts +++ b/plugins/cron/index.ts @@ -173,8 +173,13 @@ export class CronPlugin extends StarbasePlugin { cronTab: string, name: string, payload: Record = {}, - callbackHost: string + callbackHost: string, + dataSource?: DataSource ) { + // Allow callers (e.g. other plugins) to supply the data source directly so + // they don't have to depend on this plugin's middleware having run first. + if (dataSource) this.dataSource = dataSource + if (!this.dataSource) throw new Error('CronPlugin not properly initialized') @@ -187,6 +192,21 @@ export class CronPlugin extends StarbasePlugin { await this.scheduleNextAlarm() } + public async removeEvent(name: string, dataSource?: DataSource) { + if (dataSource) this.dataSource = dataSource + + if (!this.dataSource) + throw new Error('CronPlugin not properly initialized') + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.DELETE_TASK, + params: [name], + }) + + // Reschedule alarms after removing the task + await this.scheduleNextAlarm() + } + public onEvent( callback: (payload: CronEventPayload) => void | Promise, ctx?: ExecutionContext From 7afaef376293d482dd09a71ff424b334b260bbef Mon Sep 17 00:00:00 2001 From: mikhaeelatefrizk <180051136+mikhaeelatefrizk@users.noreply.github.com> Date: Mon, 8 Jun 2026 00:25:08 +0300 Subject: [PATCH 2/4] fix(operation): close external SDK connection to prevent a leak --- src/operation.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/operation.ts b/src/operation.ts index 4abc0dd..8cc201b 100644 --- a/src/operation.ts +++ b/src/operation.ts @@ -467,6 +467,18 @@ export async function executeSDKQuery(opts: { await connection.connect() - const { data } = await connection.raw(opts.sql, opts.params) - return data + try { + const { data } = await connection.raw(opts.sql, opts.params) + return data + } finally { + // Close the underlying driver connection to avoid leaking it. When an + // ExecutionContext is available we close in the background (mirroring the + // Hyperdrive branch in executeQuery above); otherwise we await it. + const cleanup = connection.disconnect() + if (opts.dataSource?.executionContext) { + opts.dataSource.executionContext.waitUntil(cleanup) + } else { + await cleanup + } + } } From f4374247b386ec2fed763139ecb5b05d833fc2ad Mon Sep 17 00:00:00 2001 From: mikhaeelatefrizk <180051136+mikhaeelatefrizk@users.noreply.github.com> Date: Mon, 8 Jun 2026 00:25:16 +0300 Subject: [PATCH 3/4] feat(replication): add external-source data replication plugin --- plugins/replication/README.md | 134 +++++ plugins/replication/index.test.ts | 821 ++++++++++++++++++++++++++++++ plugins/replication/index.ts | 806 +++++++++++++++++++++++++++++ plugins/replication/meta.json | 41 ++ 4 files changed, 1802 insertions(+) create mode 100644 plugins/replication/README.md create mode 100644 plugins/replication/index.test.ts create mode 100644 plugins/replication/index.ts create mode 100644 plugins/replication/meta.json diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..4b888d4 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,134 @@ +# Replication Plugin + +Pull new and changed rows from an **external source** (Postgres, MySQL — e.g. a +Supabase database) into StarbaseDB's **internal SQLite** on a schedule. Each table +is replicated with append-only polling: the plugin remembers the last value of a +user-chosen tracking column and only fetches rows beyond it on the next run, giving +you a queryable edge replica without hitting the source on every read. + +## How it works + +A Durable Object has a single alarm, which StarbaseDB hardcodes to the [Cron +plugin](../cron). Rather than competing for that alarm, the replication plugin +registers a **cron task per job** and runs its sync when that task fires — so +scheduling is delegated to cron and there is no alarm collision. + +On each tick, for every active job, the plugin: + +1. Queries the external source for rows newer than the stored cursor: + `SELECT FROM WHERE > ? ORDER BY ASC LIMIT ` + (the predicate is omitted on the very first run). +2. Ensures the destination table exists, inferring column types from the data. +3. Upserts the page with `INSERT OR REPLACE` into the internal database. +4. Advances the cursor to the largest tracking value seen and persists it. +5. Repeats until a page comes back shorter than `batch_size` (or a per-run page + cap is hit, in which case the remainder is picked up on the next tick). + +The external pull bypasses the internal RLS, allowlist and cache layers and uses +`?` placeholders (the Outerbase SDK rewrites these to `$1`/`?` per dialect), so the +query reaches the source unmodified. + +## Configuration + +Every endpoint requires the **admin** authorization token — jobs hold external +database credentials and write to the internal database. + +A job is described by: + +| Field | Required | Description | +| --------------- | -------- | --------------------------------------------------------------------------------------------------------------- | +| `name` | yes | Unique job id (`[A-Za-z0-9_-]`). | +| `source` | yes | External connection: `{ dialect, host, port, user, password, database }`. `dialect` is `postgresql` or `mysql`. | +| `table_name` | yes | Table to replicate from the source. | +| `tracking_col` | yes | Column used to detect new rows (e.g. `updated_at` or `id`). | +| `tracking_type` | no | `timestamp` (default) or `id` — controls cursor comparison. | +| `cron_tab` | yes | Standard cron expression for the pull interval (e.g. `*/5 * * * *`). | +| `target_table` | no | Internal table name (defaults to `table_name`). | +| `columns` | no | Subset of columns to replicate (defaults to all). | +| `primary_key` | no | Column(s) used as the destination primary key for idempotent upserts. | +| `batch_size` | no | Rows per page, 1–10000 (default 500). | + +> When `columns` is set it must include `tracking_col` and every `primary_key` +> column — otherwise the cursor could never advance and the destination table +> would reference a column that is never pulled. This is validated on create. + +## API + +```bash +# Create / update a job (and schedule it) +curl -X POST http://localhost:8787/replication/jobs \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "users_sync", + "table_name": "users", + "tracking_col": "updated_at", + "tracking_type": "timestamp", + "cron_tab": "*/5 * * * *", + "primary_key": ["id"], + "source": { + "dialect": "postgresql", + "host": "db.xxx.supabase.co", + "port": 5432, + "user": "postgres", + "password": "•••", + "database": "postgres" + } + }' + +# List jobs and their status (passwords are redacted) +curl http://localhost:8787/replication/jobs -H "Authorization: Bearer $ADMIN_TOKEN" + +# Run a job immediately +curl -X POST http://localhost:8787/replication/jobs/users_sync/run -H "Authorization: Bearer $ADMIN_TOKEN" + +# Reset a job's cursor (re-replicate from the beginning) +curl -X POST http://localhost:8787/replication/jobs/users_sync/reset -H "Authorization: Bearer $ADMIN_TOKEN" + +# Pause / resume a job +curl -X PATCH http://localhost:8787/replication/jobs/users_sync \ + -H "Authorization: Bearer $ADMIN_TOKEN" -H "Content-Type: application/json" \ + -d '{"is_active": false}' + +# Delete a job (and its cron task) +curl -X DELETE http://localhost:8787/replication/jobs/users_sync -H "Authorization: Bearer $ADMIN_TOKEN" +``` + +## Setup + +The plugin is registered in `src/index.ts` after the cron plugin, sharing the cron +instance for scheduling: + +```ts +import { CronPlugin } from '../plugins/cron' +import { ReplicationPlugin } from '../plugins/replication' + +const cronPlugin = new CronPlugin() +const replicationPlugin = new ReplicationPlugin({ cron: cronPlugin }) + +// Sync runs when a replication cron task fires. dataSource is captured from the +// request scope because the cron callback runs on a separate request. +cronPlugin.onEvent( + (event) => replicationPlugin.handleCronEvent(event, dataSource), + ctx +) + +const plugins = [ + // ... + cronPlugin, + replicationPlugin, +] satisfies StarbasePlugin[] +``` + +## Notes & limitations + +- **Tracking column** should be monotonically increasing and ideally unique. The + cursor predicate uses `>` (strictly greater) to avoid re-writing the boundary + row; if many rows share a tracking value across a page boundary one could be + skipped, so prefer a unique `id` or a high-resolution timestamp. +- **Cron granularity** is one minute, inherited from standard cron expressions. +- **Append-only / upsert.** Source deletes are not propagated (this is a pull-based + replica); supply a `primary_key` so re-synced rows replace rather than duplicate. +- **Observability.** Each job row records `last_run_at`, `last_error` and + `rows_synced`; a failing job is isolated and never blocks the others. +- Operational state lives in `tmp_replication_jobs`. diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..25b17f2 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,821 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { + ReplicationPlugin, + assertIdent, + quoteIdent, + clampBatch, + buildPullQuery, + toRowObjects, + advanceCursor, + inferSqliteType, + serializeValue, + parseJSONArray, + redactConfig, +} from './index' +import { executeQuery } from '../../src/operation' + +vi.mock('../../src/operation', () => ({ + executeQuery: vi.fn(), +})) + +const mockExecuteQuery = vi.mocked(executeQuery) + +// ── Fixtures ─────────────────────────────────────────────────────────────── + +function makeDataSource() { + return { + source: 'internal', + rpc: { + executeQuery: vi.fn().mockResolvedValue([]), + }, + executionContext: { waitUntil: vi.fn() }, + } as any +} + +const mockCron = { + addEvent: vi.fn().mockResolvedValue(undefined), + removeEvent: vi.fn().mockResolvedValue(undefined), +} + +function jobRow(overrides: Record = {}) { + return { + name: 'users_sync', + source_config: JSON.stringify({ + dialect: 'postgresql', + host: 'h', + port: 5432, + user: 'u', + password: 'p', + database: 'd', + }), + table_name: 'users', + tracking_col: 'updated_at', + tracking_type: 'timestamp', + last_value: null, + cron_tab: '*/5 * * * *', + target_table: null, + columns: null, + primary_key: null, + batch_size: 500, + is_active: 1, + last_run_at: null, + last_error: null, + rows_synced: 0, + created_at: null, + ...overrides, + } +} + +const VALID_BODY = { + name: 'users_sync', + table_name: 'users', + tracking_col: 'updated_at', + tracking_type: 'timestamp' as const, + cron_tab: '*/5 * * * *', + source: { + dialect: 'postgresql' as const, + host: 'h', + port: 5432, + user: 'u', + password: 'p', + database: 'd', + }, +} + +function makeContext(opts: { + body?: any + param?: Record + url?: string +}) { + return { + req: { + json: async () => opts.body, + param: (key: string) => opts.param?.[key], + url: opts.url ?? 'http://localhost:8787/replication/jobs', + }, + } as any +} + +function makePlugin(maxPagesPerRun = 50) { + const plugin = new ReplicationPlugin({ + cron: mockCron as any, + maxPagesPerRun, + }) + const ds = makeDataSource() + plugin['dataSource'] = ds + plugin['config'] = { role: 'admin' } as any + return { plugin, ds } +} + +// Make GET_JOB return our row while every other internal query returns []. +function withJob(ds: any, row: Record) { + ds.rpc.executeQuery.mockImplementation(async ({ sql }: { sql: string }) => + sql.includes('SELECT * FROM tmp_replication_jobs') ? [row] : [] + ) +} + +beforeEach(() => { + vi.clearAllMocks() + mockExecuteQuery.mockReset() +}) + +// ── Pure helpers ───────────────────────────────────────────────────────────── + +describe('assertIdent', () => { + it('accepts safe identifiers including reserved words', () => { + expect(() => assertIdent('users_2', 'table')).not.toThrow() + expect(() => assertIdent('order', 'table')).not.toThrow() + }) + + it('rejects injection, spaces, empty and non-strings', () => { + expect(() => assertIdent('users; DROP TABLE x', 'table')).toThrow() + expect(() => assertIdent('a b', 'table')).toThrow() + expect(() => assertIdent('', 'table')).toThrow() + expect(() => assertIdent(undefined, 'table')).toThrow() + expect(() => assertIdent('a"b', 'table')).toThrow() + }) +}) + +describe('quoteIdent', () => { + it('quotes per dialect', () => { + expect(quoteIdent('col', 'postgresql')).toBe('"col"') + expect(quoteIdent('col', 'mysql')).toBe('`col`') + expect(quoteIdent('col', 'sqlite')).toBe('"col"') + }) +}) + +describe('clampBatch', () => { + it('defaults invalid input to 500', () => { + expect(clampBatch(undefined)).toBe(500) + expect(clampBatch(0)).toBe(500) + expect(clampBatch(-3)).toBe(500) + expect(clampBatch('abc')).toBe(500) + }) + + it('clamps and floors', () => { + expect(clampBatch(50)).toBe(50) + expect(clampBatch(999999)).toBe(10000) + expect(clampBatch(12.9)).toBe(12) + }) +}) + +describe('buildPullQuery', () => { + it('omits WHERE on the first run', () => { + const sql = buildPullQuery( + 'users', + 'updated_at', + null, + false, + 500, + 'postgresql' + ) + expect(sql).not.toContain('WHERE') + expect(sql).toContain('SELECT * FROM "users"') + expect(sql).toContain('ORDER BY "updated_at" ASC LIMIT 500') + }) + + it('adds a ? cursor predicate when a cursor exists', () => { + const sql = buildPullQuery( + 'users', + 'updated_at', + null, + true, + 100, + 'postgresql' + ) + expect(sql).toContain('WHERE "updated_at" > ?') + expect(sql).not.toContain('$1') + }) + + it('selects an explicit column list and quotes per dialect', () => { + const pg = buildPullQuery( + 'users', + 'id', + ['id', 'name'], + false, + 10, + 'postgresql' + ) + expect(pg).toContain('SELECT "id", "name" FROM "users"') + + const my = buildPullQuery('users', 'id', ['id'], false, 10, 'mysql') + expect(my).toContain('SELECT `id` FROM `users`') + expect(my).toContain('ORDER BY `id` ASC') + }) +}) + +describe('toRowObjects', () => { + it('maps {columns, rows} into row objects', () => { + expect( + toRowObjects({ + columns: ['id', 'name'], + rows: [ + [1, 'a'], + [2, 'b'], + ], + }) + ).toEqual([ + { id: 1, name: 'a' }, + { id: 2, name: 'b' }, + ]) + }) + + it('passes through an array of objects and handles empties', () => { + expect(toRowObjects([{ id: 1 }])).toEqual([{ id: 1 }]) + expect(toRowObjects(undefined)).toEqual([]) + expect(toRowObjects({ columns: [], rows: [] })).toEqual([]) + }) +}) + +describe('advanceCursor', () => { + it('compares ids numerically', () => { + const rows = [{ id: 9 }, { id: 10 }, { id: 2 }] + expect(advanceCursor(null, rows, 'id', 'id')).toBe('10') + }) + + it('compares timestamps temporally and keeps the latest', () => { + const rows = [ + { ts: '2024-01-01T00:00:00Z' }, + { ts: '2024-03-01T00:00:00Z' }, + { ts: '2024-02-01T00:00:00Z' }, + ] + expect(advanceCursor(null, rows, 'ts', 'timestamp')).toBe( + '2024-03-01T00:00:00Z' + ) + }) + + it('never regresses below the previous cursor', () => { + const rows = [{ id: 3 }] + expect(advanceCursor('100', rows, 'id', 'id')).toBe('100') + }) + + it('skips null tracking values', () => { + const rows = [{ id: null }, { id: 5 }] + expect(advanceCursor(null, rows, 'id', 'id')).toBe('5') + }) + + it('serializes Date values to ISO', () => { + const rows = [{ ts: new Date('2024-05-05T00:00:00Z') }] + expect(advanceCursor(null, rows, 'ts', 'timestamp')).toBe( + '2024-05-05T00:00:00.000Z' + ) + }) +}) + +describe('inferSqliteType', () => { + it('maps JS values to SQLite affinities', () => { + expect(inferSqliteType(1)).toBe('INTEGER') + expect(inferSqliteType(1.5)).toBe('REAL') + expect(inferSqliteType(true)).toBe('INTEGER') + expect(inferSqliteType('x')).toBe('TEXT') + expect(inferSqliteType({ a: 1 })).toBe('TEXT') + expect(inferSqliteType(null)).toBe('TEXT') + }) +}) + +describe('serializeValue', () => { + it('coerces values for SQLite binding', () => { + expect(serializeValue(true)).toBe(1) + expect(serializeValue(false)).toBe(0) + expect(serializeValue(null)).toBe(null) + expect(serializeValue(undefined)).toBe(null) + expect(serializeValue({ a: 1 })).toBe('{"a":1}') + expect(serializeValue([1, 2])).toBe('[1,2]') + expect(serializeValue(new Date('2024-01-01T00:00:00Z'))).toBe( + '2024-01-01T00:00:00.000Z' + ) + expect(serializeValue('hello')).toBe('hello') + expect(serializeValue(7)).toBe(7) + }) + + it('neutralizes non-finite numbers and binds bigint safely', () => { + expect(serializeValue(NaN)).toBe(null) + expect(serializeValue(Infinity)).toBe(null) + expect(serializeValue(-Infinity)).toBe(null) + expect(serializeValue(123n)).toBe(123) + expect(serializeValue(9007199254740993n)).toBe('9007199254740993') + }) +}) + +describe('parseJSONArray', () => { + it('parses arrays and rejects everything else', () => { + expect(parseJSONArray('["a","b"]')).toEqual(['a', 'b']) + expect(parseJSONArray(null)).toBe(null) + expect(parseJSONArray('not json')).toBe(null) + expect(parseJSONArray('{"a":1}')).toBe(null) + }) +}) + +describe('redactConfig', () => { + it('masks the password', () => { + const out = redactConfig( + JSON.stringify({ host: 'h', password: 'secret' }) + ) + expect(JSON.parse(out).password).toBe('***') + expect(JSON.parse(out).host).toBe('h') + }) + + it('returns the input unchanged when it cannot be parsed', () => { + expect(redactConfig('not json')).toBe('not json') + }) +}) + +// ── Routes ─────────────────────────────────────────────────────────────────── + +describe('ReplicationPlugin - routes (admin gate)', () => { + it('rejects non-admin callers on every mutating route', async () => { + const { plugin } = makePlugin() + plugin['config'] = { role: 'client' } as any + + for (const handler of [ + 'handleCreateJob', + 'handleListJobs', + 'handleDeleteJob', + 'handleRunJob', + 'handleResetJob', + 'handlePatchJob', + ]) { + const res = await (plugin as any)[handler]( + makeContext({ body: VALID_BODY, param: { name: 'x' } }) + ) + expect(res.status).toBe(400) + expect(await res.text()).toBe('Unauthorized request') + } + }) +}) + +describe('ReplicationPlugin - handleCreateJob', () => { + it('validates, upserts and schedules a cron task', async () => { + const { plugin, ds } = makePlugin() + const res = await plugin['handleCreateJob']( + makeContext({ + body: VALID_BODY, + url: 'http://example.com:8787/replication/jobs', + }) + ) + + expect(res.status).toBe(200) + const upsert = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('INSERT OR REPLACE INTO tmp_replication_jobs') + ) + expect(upsert).toBeTruthy() + expect(mockCron.addEvent).toHaveBeenCalledWith( + '*/5 * * * *', + 'replication:users_sync', + {}, + 'http://example.com:8787', + ds + ) + }) + + it('preserves cursor, counters and created_at on re-create', async () => { + const { plugin, ds } = makePlugin() + ds.rpc.executeQuery.mockImplementation(async ({ sql }: any) => + sql.includes('SELECT last_value, rows_synced, created_at') + ? [{ last_value: '99', rows_synced: 42, created_at: '2020' }] + : [] + ) + + await plugin['handleCreateJob'](makeContext({ body: VALID_BODY })) + const upsert = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('INSERT OR REPLACE INTO tmp_replication_jobs') + ) + expect(upsert[0].params[5]).toBe('99') // last_value + expect(upsert[0].params[14]).toBe(42) // rows_synced + expect(upsert[0].params[15]).toBe('2020') // created_at + }) + + it('rejects bad identifiers, cron and source', async () => { + const { plugin } = makePlugin() + const bad = [ + { ...VALID_BODY, table_name: 'users; DROP TABLE x' }, + { ...VALID_BODY, tracking_col: 'a b' }, + { ...VALID_BODY, cron_tab: 'definitely-not-cron' }, + { + ...VALID_BODY, + source: { ...VALID_BODY.source, dialect: 'sqlite' }, + }, + { ...VALID_BODY, name: 'bad name!' }, + ] + for (const body of bad) { + const res = await plugin['handleCreateJob'](makeContext({ body })) + expect(res.status).toBe(400) + } + }) + + it('rejects an explicit columns list missing tracking/primary-key columns', async () => { + const { plugin } = makePlugin() + const missingTracking = { ...VALID_BODY, columns: ['name', 'email'] } + const missingPk = { + ...VALID_BODY, + columns: ['updated_at', 'name'], + primary_key: ['id'], + } + expect( + ( + await plugin['handleCreateJob']( + makeContext({ body: missingTracking }) + ) + ).status + ).toBe(400) + expect( + (await plugin['handleCreateJob'](makeContext({ body: missingPk }))) + .status + ).toBe(400) + }) + + it('accepts a columns list that includes tracking and primary key', async () => { + const { plugin } = makePlugin() + const res = await plugin['handleCreateJob']( + makeContext({ + body: { + ...VALID_BODY, + columns: ['id', 'updated_at'], + primary_key: ['id'], + }, + }) + ) + expect(res.status).toBe(200) + }) +}) + +describe('ReplicationPlugin - list/delete/reset/patch', () => { + it('redacts the source password when listing', async () => { + const { plugin, ds } = makePlugin() + ds.rpc.executeQuery.mockResolvedValue([ + jobRow({ + source_config: JSON.stringify({ + dialect: 'postgresql', + host: 'h', + password: 'secret', + }), + }), + ]) + + const res = await plugin['handleListJobs'](makeContext({})) + const json: any = await res.json() + expect(JSON.parse(json.result[0].source_config).password).toBe('***') + }) + + it('deletes the row and removes the cron task', async () => { + const { plugin, ds } = makePlugin() + await plugin['handleDeleteJob']( + makeContext({ param: { name: 'users_sync' } }) + ) + expect(ds.rpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining( + 'DELETE FROM tmp_replication_jobs' + ), + }) + ) + expect(mockCron.removeEvent).toHaveBeenCalledWith( + 'replication:users_sync', + ds + ) + }) + + it('resets the cursor', async () => { + const { plugin, ds } = makePlugin() + await plugin['handleResetJob']( + makeContext({ param: { name: 'users_sync' } }) + ) + expect(ds.rpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining('last_value = NULL'), + }) + ) + }) + + it('toggles is_active and rejects a non-boolean', async () => { + const { plugin, ds } = makePlugin() + await plugin['handlePatchJob']( + makeContext({ + body: { is_active: false }, + param: { name: 'users_sync' }, + }) + ) + const call = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('SET is_active') + ) + expect(call[0].params).toEqual([0, 'users_sync']) + + const res = await plugin['handlePatchJob']( + makeContext({ body: {}, param: { name: 'users_sync' } }) + ) + expect(res.status).toBe(400) + }) +}) + +// ── Sync engine ────────────────────────────────────────────────────────────── + +describe('ReplicationPlugin - runSync', () => { + it('first run issues a WHERE-less hardened raw pull', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [[1, '2024-01-01']], + } as any) + + await plugin.runSync('users_sync', ds) + + const pull = mockExecuteQuery.mock.calls[0][0] + expect(pull.sql).not.toContain('WHERE') + expect(pull.params).toEqual([]) + expect(pull.isRaw).toBe(true) + expect(pull.config.role).toBe('admin') + expect(pull.config.features?.rls).toBe(false) + expect(pull.config.features?.allowlist).toBe(false) + expect(pull.dataSource.source).toBe('external') + }) + + it('uses the cursor predicate with a ? parameter on subsequent runs', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ last_value: '2024-01-01' })) + mockExecuteQuery.mockResolvedValueOnce({ columns: [], rows: [] } as any) + + await plugin.runSync('users_sync', ds) + + const pull = mockExecuteQuery.mock.calls[0][0] + expect(pull.sql).toContain('WHERE "updated_at" > ?') + expect(pull.params).toEqual(['2024-01-01']) + }) + + it('upserts rows and advances the cursor to the max value', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [ + [1, '2024-01-01'], + [2, '2024-01-03'], + ], + } as any) + + await plugin.runSync('users_sync', ds) + + const insert = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('INSERT OR REPLACE INTO "users"') + ) + expect(insert).toBeTruthy() + const cursor = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('last_value = ?') + ) + expect(cursor[0].params[0]).toBe('2024-01-03') + }) + + it('creates the destination table with inferred types and a primary key', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ primary_key: JSON.stringify(['id']) })) + mockExecuteQuery.mockResolvedValueOnce({ + columns: ['id', 'name', 'active', 'score'], + rows: [[1, 'a', true, 1.5]], + } as any) + + await plugin.runSync('users_sync', ds) + + const create = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.startsWith('CREATE TABLE IF NOT EXISTS "users"') + ) + expect(create[0].sql).toContain('"id" INTEGER') + expect(create[0].sql).toContain('"name" TEXT') + expect(create[0].sql).toContain('"active" INTEGER') + expect(create[0].sql).toContain('"score" REAL') + expect(create[0].sql).toContain('PRIMARY KEY ("id")') + }) + + it('replicates into target_table when provided', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ target_table: 'mirror_users' })) + mockExecuteQuery.mockResolvedValueOnce({ + columns: ['id'], + rows: [[1]], + } as any) + + await plugin.runSync('users_sync', ds) + const insert = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('INSERT OR REPLACE INTO "mirror_users"') + ) + expect(insert).toBeTruthy() + }) + + it('paginates until a short page and stops', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ batch_size: 2 })) + mockExecuteQuery + .mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [ + [1, 'a'], + [2, 'b'], + ], + } as any) + .mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [[3, 'c']], + } as any) + + const result = await plugin.runSync('users_sync', ds) + expect(mockExecuteQuery).toHaveBeenCalledTimes(2) + expect(result.pages).toBe(2) + expect(result.rowsSynced).toBe(3) + }) + + it('is bounded by maxPagesPerRun when pages stay full', async () => { + const { plugin, ds } = makePlugin(2) + withJob(ds, jobRow({ batch_size: 2 })) + mockExecuteQuery.mockResolvedValue({ + columns: ['id', 'updated_at'], + rows: [ + [1, 'a'], + [2, 'b'], + ], + } as any) + + await plugin.runSync('users_sync', ds) + expect(mockExecuteQuery).toHaveBeenCalledTimes(2) + }) + + it('persists the cursor after each page', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ batch_size: 1 })) + mockExecuteQuery + .mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [[1, '2024-01-01']], + } as any) + .mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [[2, '2024-01-02']], + } as any) + .mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [], + } as any) + + await plugin.runSync('users_sync', ds) + const cursorUpdates = ds.rpc.executeQuery.mock.calls.filter((c: any) => + c[0].sql.includes('last_value = ?') + ) + expect(cursorUpdates.length).toBe(2) + }) + + it('does nothing for a paused job', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ is_active: 0 })) + const result = await plugin.runSync('users_sync', ds) + expect(mockExecuteQuery).not.toHaveBeenCalled() + expect(result).toEqual({ rowsSynced: 0, pages: 0 }) + }) + + it('returns a no-op for an unknown job', async () => { + const { plugin, ds } = makePlugin() + ds.rpc.executeQuery.mockResolvedValue([]) + const result = await plugin.runSync('missing', ds) + expect(result).toEqual({ rowsSynced: 0, pages: 0 }) + }) + + it('records the error and re-throws on pull failure', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockRejectedValueOnce(new Error('source down')) + + await expect(plugin.runSync('users_sync', ds)).rejects.toThrow( + 'source down' + ) + const meta = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('last_run_at = ?') + ) + expect(meta[0].params[1]).toContain('source down') + }) + + it('stops after one page when the tracking column cannot advance', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow({ batch_size: 2 })) + // A full page whose tracking values are all NULL: the cursor cannot + // move forward, so we must not re-fetch the same page. + mockExecuteQuery.mockResolvedValue({ + columns: ['id', 'updated_at'], + rows: [ + [1, null], + [2, null], + ], + } as any) + + await plugin.runSync('users_sync', ds) + expect(mockExecuteQuery).toHaveBeenCalledTimes(1) + }) + + it('infers column types from the first non-null value across the page', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockResolvedValueOnce({ + columns: ['id', 'score', 'updated_at'], + rows: [ + [null, null, '2024-01-01'], + [2, 1.5, '2024-01-02'], + ], + } as any) + + await plugin.runSync('users_sync', ds) + const create = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.startsWith('CREATE TABLE IF NOT EXISTS "users"') + ) + expect(create[0].sql).toContain('"id" INTEGER') + expect(create[0].sql).toContain('"score" REAL') + }) + + it('redacts the source password from sync error messages', async () => { + const { plugin, ds } = makePlugin() + withJob( + ds, + jobRow({ + source_config: JSON.stringify({ + dialect: 'postgresql', + host: 'h', + port: 5432, + user: 'u', + password: 's3cr3t', + database: 'd', + }), + }) + ) + mockExecuteQuery.mockRejectedValueOnce( + new Error('auth failed: password=s3cr3t') + ) + + await expect(plugin.runSync('users_sync', ds)).rejects.toThrow('***') + const meta = ds.rpc.executeQuery.mock.calls.find((c: any) => + c[0].sql.includes('last_run_at = ?') + ) + expect(meta[0].params[1]).not.toContain('s3cr3t') + expect(meta[0].params[1]).toContain('***') + }) +}) + +describe('ReplicationPlugin - handleCronEvent', () => { + it('ignores events that are not replication tasks', async () => { + const { plugin, ds } = makePlugin() + await plugin.handleCronEvent({ name: 'some-other-task' }, ds) + expect(ds.rpc.executeQuery).not.toHaveBeenCalled() + expect(mockExecuteQuery).not.toHaveBeenCalled() + }) + + it('routes a replication task to runSync by its name prefix', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockResolvedValueOnce({ columns: [], rows: [] } as any) + + await plugin.handleCronEvent({ name: 'replication:users_sync' }, ds) + expect(ds.rpc.executeQuery).toHaveBeenCalledWith( + expect.objectContaining({ + sql: expect.stringContaining( + 'SELECT * FROM tmp_replication_jobs' + ), + }) + ) + }) + + it('swallows sync errors so other jobs are unaffected', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockRejectedValueOnce(new Error('boom')) + + await expect( + plugin.handleCronEvent({ name: 'replication:users_sync' }, ds) + ).resolves.toBeUndefined() + }) +}) + +describe('ReplicationPlugin - handleRunJob', () => { + it('runs a sync inline and returns counts', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockResolvedValueOnce({ + columns: ['id', 'updated_at'], + rows: [[1, '2024-01-01']], + } as any) + + const res = await plugin['handleRunJob']( + makeContext({ param: { name: 'users_sync' } }) + ) + const json: any = await res.json() + expect(res.status).toBe(200) + expect(json.result.success).toBe(true) + expect(json.result.rowsSynced).toBe(1) + }) + + it('returns 500 with the message when a sync fails', async () => { + const { plugin, ds } = makePlugin() + withJob(ds, jobRow()) + mockExecuteQuery.mockRejectedValueOnce(new Error('kaboom')) + + const res = await plugin['handleRunJob']( + makeContext({ param: { name: 'users_sync' } }) + ) + expect(res.status).toBe(500) + const json: any = await res.json() + expect(json.error).toContain('kaboom') + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..ebe7b32 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,806 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource, ExternalDatabaseSource } from '../../src/types' +import { createResponse } from '../../src/utils' +import { executeQuery } from '../../src/operation' +import { CronPlugin } from '../cron' +import { parseCronExpression } from '../cron/utils' + +/** + * Replication plugin — pulls new/changed rows from an external source (Postgres, + * MySQL) into StarbaseDB's internal SQLite on a schedule, tracking a per-table + * cursor so each run only fetches what is new (append-only polling). + * + * Scheduling is delegated to the Cron plugin: a Durable Object has a single alarm + * which `src/do.ts` hardcodes to the cron callback, so this plugin registers a cron + * task per job and runs its sync when that task fires (no alarm collision). + */ + +const SQL = { + CREATE_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_jobs ( + name TEXT NOT NULL UNIQUE PRIMARY KEY, + source_config TEXT NOT NULL, + table_name TEXT NOT NULL, + tracking_col TEXT NOT NULL, + tracking_type TEXT NOT NULL DEFAULT 'timestamp', + last_value TEXT, + cron_tab TEXT NOT NULL, + target_table TEXT, + columns TEXT, + primary_key TEXT, + batch_size INTEGER NOT NULL DEFAULT 500, + is_active INTEGER NOT NULL DEFAULT 1, + last_run_at TEXT, + last_error TEXT, + rows_synced INTEGER NOT NULL DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) + ) + `, + UPSERT_JOB: ` + INSERT OR REPLACE INTO tmp_replication_jobs + (name, source_config, table_name, tracking_col, tracking_type, + last_value, cron_tab, target_table, columns, primary_key, + batch_size, is_active, last_run_at, last_error, rows_synced, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')), datetime('now')) + `, + GET_JOB: `SELECT * FROM tmp_replication_jobs WHERE name = ?`, + GET_JOBS: `SELECT * FROM tmp_replication_jobs`, + DELETE_JOB: `DELETE FROM tmp_replication_jobs WHERE name = ?`, + UPDATE_CURSOR: ` + UPDATE tmp_replication_jobs + SET last_value = ?, rows_synced = ?, updated_at = datetime('now') + WHERE name = ? + `, + UPDATE_RUN_META: ` + UPDATE tmp_replication_jobs + SET last_run_at = ?, last_error = ?, updated_at = datetime('now') + WHERE name = ? + `, + RESET_CURSOR: ` + UPDATE tmp_replication_jobs + SET last_value = NULL, rows_synced = 0, updated_at = datetime('now') + WHERE name = ? + `, + SET_ACTIVE: ` + UPDATE tmp_replication_jobs + SET is_active = ?, updated_at = datetime('now') + WHERE name = ? + `, +} + +// Identifiers (table / column names) cannot be bound as SQL parameters, so they +// are interpolated. We restrict them to a safe character set and always quote +// them in emitted SQL to defend against injection and to allow reserved words. +const IDENT = /^[A-Za-z0-9_]+$/ +const JOB_NAME = /^[A-Za-z0-9_-]+$/ + +export type TrackingType = 'timestamp' | 'id' + +export interface ReplicationJobInput { + name: string + source: ExternalDatabaseSource + table_name: string + tracking_col: string + tracking_type?: TrackingType + cron_tab: string + target_table?: string + columns?: string[] + primary_key?: string[] + batch_size?: number +} + +interface ReplicationJobRow { + name: string + source_config: string + table_name: string + tracking_col: string + tracking_type: TrackingType + last_value: string | null + cron_tab: string + target_table: string | null + columns: string | null + primary_key: string | null + batch_size: number + is_active: number + last_run_at: string | null + last_error: string | null + rows_synced: number + created_at: string | null +} + +// The subset of the Cron plugin this plugin relies on. Declared as an interface +// so the dependency can be trivially mocked in tests. +export interface CronScheduler { + addEvent( + cronTab: string, + name: string, + payload: Record, + callbackHost: string, + dataSource?: DataSource + ): Promise + removeEvent(name: string, dataSource?: DataSource): Promise +} + +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix = '/replication' + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + private cron?: CronScheduler + // Upper bound on pages drained per alarm tick so a single sync can never run + // unbounded against a Worker's CPU/time budget. The remainder is picked up on + // the next tick because the cursor is persisted after every page. + private maxPagesPerRun: number + + constructor(opts?: { + cron?: CronPlugin | CronScheduler + maxPagesPerRun?: number + }) { + super('starbasedb:replication', { requiresAuth: true }) + this.cron = opts?.cron + this.maxPagesPerRun = opts?.maxPagesPerRun ?? 50 + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + this.config = c?.get('config') + await this.init() + await next() + }) + + app.post(`${this.pathPrefix}/jobs`, (c) => this.handleCreateJob(c)) + app.get(`${this.pathPrefix}/jobs`, (c) => this.handleListJobs(c)) + app.delete(`${this.pathPrefix}/jobs/:name`, (c) => + this.handleDeleteJob(c) + ) + app.post(`${this.pathPrefix}/jobs/:name/run`, (c) => + this.handleRunJob(c) + ) + app.post(`${this.pathPrefix}/jobs/:name/reset`, (c) => + this.handleResetJob(c) + ) + app.patch(`${this.pathPrefix}/jobs/:name`, (c) => + this.handlePatchJob(c) + ) + } + + private async init() { + if (!this.dataSource) return + await this.dataSource.rpc.executeQuery({ + sql: SQL.CREATE_TABLE, + params: [], + }) + } + + // Replication jobs hold external database credentials and write to the + // internal database, so every endpoint is restricted to admin callers. + private requireAdmin(): Response | undefined { + if (this.config?.role !== 'admin') { + return new Response('Unauthorized request', { status: 400 }) + } + return undefined + } + + // ────────────────────────────────────────────────────────────────────── + // Routes + // ────────────────────────────────────────────────────────────────────── + + private async handleCreateJob(c: any): Promise { + const denied = this.requireAdmin() + if (denied) return denied + + try { + const body = (await c.req.json()) as ReplicationJobInput + const job = this.validateJobInput(body) + + // Preserve cursor + counters + created_at across re-creation so + // re-submitting a job's config does not silently re-replicate. + const existing = (await this.dataSource!.rpc.executeQuery({ + sql: 'SELECT last_value, rows_synced, created_at FROM tmp_replication_jobs WHERE name = ?', + params: [job.name], + })) as unknown as Pick< + ReplicationJobRow, + 'last_value' | 'rows_synced' | 'created_at' + >[] + const prior = existing?.[0] + + await this.dataSource!.rpc.executeQuery({ + sql: SQL.UPSERT_JOB, + params: [ + job.name, + JSON.stringify(job.source), + job.table_name, + job.tracking_col, + job.tracking_type, + prior?.last_value ?? null, + job.cron_tab, + job.target_table ?? null, + job.columns ? JSON.stringify(job.columns) : null, + job.primary_key ? JSON.stringify(job.primary_key) : null, + job.batch_size, + 1, + null, + null, + prior?.rows_synced ?? 0, + prior?.created_at ?? null, + ], + }) + + // Register the schedule with the Cron plugin (reuses the DO alarm). + const origin = new URL(c.req.url).origin + await this.cron?.addEvent( + job.cron_tab, + this.taskName(job.name), + {}, + origin, + this.dataSource + ) + + return createResponse( + { success: true, job: job.name }, + undefined, + 200 + ) + } catch (error: any) { + return createResponse( + undefined, + error?.message ?? 'Failed to create replication job.', + 400 + ) + } + } + + private async handleListJobs(c: any): Promise { + const denied = this.requireAdmin() + if (denied) return denied + + const jobs = (await this.dataSource!.rpc.executeQuery({ + sql: SQL.GET_JOBS, + params: [], + })) as unknown as ReplicationJobRow[] + + // Never expose the source password over the API. + const sanitized = (jobs ?? []).map((job) => ({ + ...job, + source_config: redactConfig(job.source_config), + })) + + return createResponse(sanitized, undefined, 200) + } + + private async handleDeleteJob(c: any): Promise { + const denied = this.requireAdmin() + if (denied) return denied + + const name = c.req.param('name') + await this.dataSource!.rpc.executeQuery({ + sql: SQL.DELETE_JOB, + params: [name], + }) + await this.cron?.removeEvent(this.taskName(name), this.dataSource) + + return createResponse({ success: true }, undefined, 200) + } + + private async handleRunJob(c: any): Promise { + const denied = this.requireAdmin() + if (denied) return denied + + const name = c.req.param('name') + try { + const result = await this.runSync(name, this.dataSource!) + return createResponse({ success: true, ...result }, undefined, 200) + } catch (error: any) { + return createResponse( + undefined, + error?.message ?? 'Replication sync failed.', + 500 + ) + } + } + + private async handleResetJob(c: any): Promise { + const denied = this.requireAdmin() + if (denied) return denied + + const name = c.req.param('name') + await this.dataSource!.rpc.executeQuery({ + sql: SQL.RESET_CURSOR, + params: [name], + }) + return createResponse({ success: true }, undefined, 200) + } + + private async handlePatchJob(c: any): Promise { + const denied = this.requireAdmin() + if (denied) return denied + + const name = c.req.param('name') + const body = (await c.req.json()) as { is_active?: boolean } + if (typeof body.is_active !== 'boolean') { + return createResponse( + undefined, + 'Body must include a boolean "is_active".', + 400 + ) + } + await this.dataSource!.rpc.executeQuery({ + sql: SQL.SET_ACTIVE, + params: [body.is_active ? 1 : 0, name], + }) + return createResponse({ success: true }, undefined, 200) + } + + // ────────────────────────────────────────────────────────────────────── + // Cron event → sync + // ────────────────────────────────────────────────────────────────────── + + /** + * Invoked for every cron task that fires. The cron payload arrives as a JSON + * string at runtime, so we route purely by task name (`replication:`). + * The `dataSource` is passed in from the request scope because the cron + * callback runs on a separate request where this plugin's middleware has not + * captured one. + */ + public async handleCronEvent( + event: { name?: string }, + dataSource: DataSource + ): Promise { + const name = event?.name + if (!name || !name.startsWith('replication:')) return + const jobName = name.slice('replication:'.length) + try { + await this.runSync(jobName, dataSource) + } catch (error) { + console.error(`Replication sync failed for "${jobName}":`, error) + } + } + + /** + * Pull all rows newer than the stored cursor for a single job and upsert them + * into the internal database, advancing the cursor as it goes. Errors are + * captured on the job row so one failing job never breaks the others. + */ + public async runSync( + name: string, + dataSource: DataSource + ): Promise<{ rowsSynced: number; pages: number }> { + const jobs = (await dataSource.rpc.executeQuery({ + sql: SQL.GET_JOB, + params: [name], + })) as unknown as ReplicationJobRow[] + const job = jobs?.[0] + if (!job) return { rowsSynced: 0, pages: 0 } + if (job.is_active !== 1) return { rowsSynced: 0, pages: 0 } // paused + + const external = JSON.parse(job.source_config) as ExternalDatabaseSource + const dialect = external.dialect + const externalDS: DataSource = { + rpc: dataSource.rpc, + source: 'external', + external, + executionContext: dataSource.executionContext, + } + + // Bypass RLS / allowlist / cache for the external pull. On the cron + // callback path the request role is `client`, which would otherwise apply + // internal RLS policies to the external query and mangle the SQL. + const pullConfig = { + role: 'admin', + features: { rls: false, allowlist: false }, + } as StarbaseDBConfiguration + + const targetTable = job.target_table ?? job.table_name + const selectCols = parseJSONArray(job.columns) + const primaryKey = parseJSONArray(job.primary_key) + const batch = clampBatch(job.batch_size) + + let cursor = job.last_value + let totalSynced = job.rows_synced ?? 0 + let pages = 0 + let runError: string | null = null + + try { + for (let page = 0; page < this.maxPagesPerRun; page++) { + const sql = buildPullQuery( + job.table_name, + job.tracking_col, + selectCols, + cursor != null, + batch, + dialect + ) + const params = cursor != null ? [cursor] : [] + + const result = await executeQuery({ + sql, + params, + isRaw: true, + dataSource: externalDS, + config: pullConfig, + }) + const rows = toRowObjects(result) + if (rows.length === 0) break + + await this.applyRows(dataSource, targetTable, primaryKey, rows) + + const nextCursor = advanceCursor( + cursor, + rows, + job.tracking_col, + job.tracking_type + ) + const advanced = nextCursor !== cursor + cursor = nextCursor + totalSynced += rows.length + pages++ + + // Persist progress after each page so a mid-run interruption + // resumes from the last completed page rather than restarting. + await dataSource.rpc.executeQuery({ + sql: SQL.UPDATE_CURSOR, + params: [cursor, totalSynced, name], + }) + + // Stop if the cursor cannot move forward (e.g. the tracking + // column is entirely NULL on this page) to avoid re-fetching the + // same page until maxPagesPerRun. + if (!advanced) break + if (rows.length < batch) break // last page + } + } catch (error: any) { + // Never let the source password surface in a stored or returned + // error (driver errors can echo the connection details). + runError = redactSecrets(error?.message ?? String(error), external) + throw new Error(runError) + } finally { + await dataSource.rpc.executeQuery({ + sql: SQL.UPDATE_RUN_META, + params: [new Date().toISOString(), runError, name], + }) + } + + return { rowsSynced: totalSynced - (job.rows_synced ?? 0), pages } + } + + /** + * Ensure the destination table exists (inferring column types from the first + * row) and upsert the page of rows in chunked multi-row statements. + */ + private async applyRows( + dataSource: DataSource, + targetTable: string, + primaryKey: string[] | null, + rows: Record[] + ): Promise { + const cols = Object.keys(rows[0]) + if (cols.length === 0) return + + await this.ensureTable(dataSource, targetTable, rows, primaryKey) + + const colList = cols.map((col) => `"${col}"`).join(', ') + // SQLite caps bound variables per statement (default 999). Chunk rows so + // cols * rowsPerChunk stays comfortably under that limit. + const rowsPerChunk = Math.max(1, Math.floor(900 / cols.length)) + + for (let i = 0; i < rows.length; i += rowsPerChunk) { + const chunk = rows.slice(i, i + rowsPerChunk) + const tuple = `(${cols.map(() => '?').join(', ')})` + const placeholders = chunk.map(() => tuple).join(', ') + const params: unknown[] = [] + for (const row of chunk) { + for (const col of cols) params.push(serializeValue(row[col])) + } + + await dataSource.rpc.executeQuery({ + sql: `INSERT OR REPLACE INTO "${targetTable}" (${colList}) VALUES ${placeholders}`, + params, + }) + } + } + + private async ensureTable( + dataSource: DataSource, + targetTable: string, + rows: Record[], + primaryKey: string[] | null + ): Promise { + const defs = Object.keys(rows[0]).map((col) => { + // Infer from the first non-null value across the page so a NULL in + // the first row does not force an otherwise-typed column to TEXT. + const sample = rows.find( + (row) => row[col] !== null && row[col] !== undefined + )?.[col] + return `"${col}" ${inferSqliteType(sample)}` + }) + const pkClause = + primaryKey && primaryKey.length + ? `, PRIMARY KEY (${primaryKey.map((c) => `"${c}"`).join(', ')})` + : '' + + await dataSource.rpc.executeQuery({ + sql: `CREATE TABLE IF NOT EXISTS "${targetTable}" (${defs.join(', ')}${pkClause})`, + params: [], + }) + } + + private taskName(jobName: string): string { + return `replication:${jobName}` + } + + // ────────────────────────────────────────────────────────────────────── + // Validation + // ────────────────────────────────────────────────────────────────────── + + private validateJobInput(body: ReplicationJobInput): Required< + Pick< + ReplicationJobInput, + | 'name' + | 'table_name' + | 'tracking_col' + | 'tracking_type' + | 'cron_tab' + | 'batch_size' + > + > & { + source: ExternalDatabaseSource + target_table?: string + columns?: string[] + primary_key?: string[] + } { + if (!body || typeof body !== 'object') { + throw new Error('Request body is required.') + } + + const name = body.name + if (!name || !JOB_NAME.test(name)) { + throw new Error( + 'A "name" of letters, numbers, underscores or hyphens is required.' + ) + } + + assertIdent(body.table_name, 'table_name') + assertIdent(body.tracking_col, 'tracking_col') + + const tracking_type = body.tracking_type ?? 'timestamp' + if (tracking_type !== 'timestamp' && tracking_type !== 'id') { + throw new Error('"tracking_type" must be "timestamp" or "id".') + } + + if (!body.cron_tab || typeof body.cron_tab !== 'string') { + throw new Error('A "cron_tab" schedule is required.') + } + try { + parseCronExpression(body.cron_tab) + } catch { + throw new Error(`Invalid cron expression: "${body.cron_tab}".`) + } + + validateSource(body.source) + + if (body.target_table !== undefined) { + assertIdent(body.target_table, 'target_table') + } + if (body.columns !== undefined) { + if (!Array.isArray(body.columns) || body.columns.length === 0) { + throw new Error('"columns" must be a non-empty array.') + } + body.columns.forEach((col) => assertIdent(col, 'columns')) + } + if (body.primary_key !== undefined) { + if ( + !Array.isArray(body.primary_key) || + body.primary_key.length === 0 + ) { + throw new Error('"primary_key" must be a non-empty array.') + } + body.primary_key.forEach((col) => assertIdent(col, 'primary_key')) + } + + // When an explicit column list is given it must cover the tracking + // column (otherwise the cursor can never advance) and every primary-key + // column (otherwise CREATE TABLE references a column that isn't pulled). + if (body.columns) { + if (!body.columns.includes(body.tracking_col)) { + throw new Error( + 'When "columns" is set it must include the "tracking_col".' + ) + } + if ( + body.primary_key && + !body.primary_key.every((col) => body.columns!.includes(col)) + ) { + throw new Error( + 'When "columns" is set it must include every "primary_key" column.' + ) + } + } + + return { + name, + source: body.source, + table_name: body.table_name, + tracking_col: body.tracking_col, + tracking_type, + cron_tab: body.cron_tab, + target_table: body.target_table, + columns: body.columns, + primary_key: body.primary_key, + batch_size: clampBatch(body.batch_size), + } + } +} + +// ────────────────────────────────────────────────────────────────────────── +// Pure helpers (exported for unit testing) +// ────────────────────────────────────────────────────────────────────────── + +export function assertIdent(value: unknown, label: string): void { + if (typeof value !== 'string' || !IDENT.test(value)) { + throw new Error( + `Invalid ${label}: "${String(value)}". Only letters, numbers and underscores are allowed.` + ) + } +} + +export function quoteIdent(name: string, dialect: string): string { + return dialect === 'mysql' ? `\`${name}\`` : `"${name}"` +} + +export function clampBatch(value: unknown): number { + const n = Number(value) + if (!Number.isFinite(n) || n <= 0) return 500 + return Math.min(10000, Math.max(1, Math.floor(n))) +} + +export function buildPullQuery( + table: string, + trackingCol: string, + columns: string[] | null, + hasCursor: boolean, + batch: number, + dialect: string +): string { + const cols = + columns && columns.length + ? columns.map((col) => quoteIdent(col, dialect)).join(', ') + : '*' + const where = hasCursor + ? `WHERE ${quoteIdent(trackingCol, dialect)} > ? ` + : '' + return ( + `SELECT ${cols} FROM ${quoteIdent(table, dialect)} ` + + `${where}ORDER BY ${quoteIdent(trackingCol, dialect)} ASC LIMIT ${batch}` + ) +} + +export function toRowObjects(result: any): Record[] { + if (!result) return [] + // Mocked / non-raw paths may already return an array of row objects. + if (Array.isArray(result)) return result as Record[] + const columns: string[] = result.columns ?? [] + const rows: unknown[][] = result.rows ?? [] + return rows.map((row) => + columns.reduce((obj: Record, col, index) => { + obj[col] = row[index] + return obj + }, {}) + ) +} + +/** + * Compute the new cursor as the maximum tracking value seen. The comparison is + * type-aware: numeric for `id` (so '10' > '9'), temporal for `timestamp` + * (falling back to lexical order for ISO-8601 strings). NULLs are skipped. + */ +export function advanceCursor( + previous: string | null, + rows: Record[], + trackingCol: string, + trackingType: TrackingType +): string | null { + let max = previous + for (const row of rows) { + const raw = row[trackingCol] + if (raw === null || raw === undefined) continue + const value = raw instanceof Date ? raw.toISOString() : String(raw) + if (max === null) { + max = value + continue + } + if (greaterThan(value, max, trackingType)) max = value + } + return max +} + +function greaterThan(a: string, b: string, type: TrackingType): boolean { + if (type === 'id') { + const na = Number(a) + const nb = Number(b) + if (!Number.isNaN(na) && !Number.isNaN(nb)) return na > nb + return a > b + } + const ta = Date.parse(a) + const tb = Date.parse(b) + if (!Number.isNaN(ta) && !Number.isNaN(tb)) return ta > tb + return a > b +} + +export function inferSqliteType(value: unknown): string { + if (typeof value === 'number') { + return Number.isInteger(value) ? 'INTEGER' : 'REAL' + } + if (typeof value === 'boolean') return 'INTEGER' + if (typeof value === 'bigint') return 'INTEGER' + return 'TEXT' +} + +export function serializeValue(value: unknown): unknown { + if (value === null || value === undefined) return null + if (typeof value === 'boolean') return value ? 1 : 0 + if (typeof value === 'number') { + // SQLite has no NaN / Infinity storage class. + return Number.isFinite(value) ? value : null + } + if (typeof value === 'bigint') { + // Cloudflare's SqlStorage binds number/string/null — not bigint — so + // narrow to a number when it fits, otherwise keep full precision as text. + return value >= BigInt(Number.MIN_SAFE_INTEGER) && + value <= BigInt(Number.MAX_SAFE_INTEGER) + ? Number(value) + : String(value) + } + if (value instanceof Date) return value.toISOString() + if (typeof value === 'object') return JSON.stringify(value) + return value +} + +export function parseJSONArray(value: string | null): string[] | null { + if (!value) return null + try { + const parsed = JSON.parse(value) + return Array.isArray(parsed) ? parsed : null + } catch { + return null + } +} + +export function redactSecrets( + message: string, + source: ExternalDatabaseSource +): string { + const secret = (source as { password?: string })?.password + if (!secret) return message + return message.split(secret).join('***') +} + +export function redactConfig(sourceConfig: string): string { + try { + const parsed = JSON.parse(sourceConfig) + if (parsed && typeof parsed === 'object' && 'password' in parsed) { + parsed.password = '***' + } + return JSON.stringify(parsed) + } catch { + return sourceConfig + } +} + +function validateSource(source: any): void { + if (!source || typeof source !== 'object') { + throw new Error('A "source" connection object is required.') + } + if (source.dialect !== 'postgresql' && source.dialect !== 'mysql') { + throw new Error( + 'source.dialect must be "postgresql" or "mysql" for replication.' + ) + } + for (const field of ['host', 'port', 'user', 'password', 'database']) { + if (source[field] === undefined || source[field] === null) { + throw new Error(`source.${field} is required.`) + } + } +} diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..96fc983 --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,41 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replication_jobs": [ + "name", + "source_config", + "table_name", + "tracking_col", + "tracking_type", + "last_value", + "cron_tab", + "target_table", + "columns", + "primary_key", + "batch_size", + "is_active", + "last_run_at", + "last_error", + "rows_synced", + "created_at", + "updated_at" + ] + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": { + "tmp_cron_tasks": [ + "name", + "cron_tab", + "payload", + "callback_host", + "is_active" + ] + }, + "secrets": {}, + "variables": {} + } +} From 69d34625f3ce9323b19560075b8489976297ed8d Mon Sep 17 00:00:00 2001 From: mikhaeelatefrizk <180051136+mikhaeelatefrizk@users.noreply.github.com> Date: Mon, 8 Jun 2026 00:25:24 +0300 Subject: [PATCH 4/4] feat(replication): register the replication plugin --- src/index.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/index.ts b/src/index.ts index 4d08932..bf28d9e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,7 @@ import { ChangeDataCapturePlugin } from '../plugins/cdc' import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' +import { ReplicationPlugin } from '../plugins/replication' import { InterfacePlugin } from '../plugins/interface' export { StarbaseDBDurableObject } from './do' @@ -195,6 +196,9 @@ export default { const webSocketPlugin = new WebSocketPlugin() const cronPlugin = new CronPlugin() + const replicationPlugin = new ReplicationPlugin({ + cron: cronPlugin, + }) const cdcPlugin = new ChangeDataCapturePlugin({ stub, broadcastAllEvents: false, @@ -209,6 +213,14 @@ export default { // Include cron event code here }, ctx) + // Run replication syncs when their cron tasks fire. The data source is + // captured here from the request scope because the cron callback runs + // on a separate request where the replication middleware has not run. + cronPlugin.onEvent( + (event) => replicationPlugin.handleCronEvent(event, dataSource), + ctx + ) + const interfacePlugin = new InterfacePlugin() const plugins = [ @@ -224,6 +236,7 @@ export default { new QueryLogPlugin({ ctx }), cdcPlugin, cronPlugin, + replicationPlugin, new StatsPlugin(), interfacePlugin, ] satisfies StarbasePlugin[]