diff --git a/docs/reference/powersync-db-collection/classes/PowerSyncTransactor.md b/docs/reference/powersync-db-collection/classes/PowerSyncTransactor.md index e33ad5872..93604de4e 100644 --- a/docs/reference/powersync-db-collection/classes/PowerSyncTransactor.md +++ b/docs/reference/powersync-db-collection/classes/PowerSyncTransactor.md @@ -11,8 +11,17 @@ Applies mutations to the PowerSync database. This method is called automatically insert, update, and delete operations. You typically don't need to call this directly unless you have special transaction requirements. +By default, transactions resolve in [`TransactorMode.OFFLINE`](../enumerations/TransactorMode.md#offline) +after the local SQLite write has been observed by TanStack DB. For workflows that +need server acknowledgement, the experimental +[`TransactorMode.ONLINE`](../enumerations/TransactorMode.md#online) mode waits +for PowerSync to upload the mutation to the backend and sync the accepted change +back down before resolving. + ## Example +Local-first transaction handling. + ```typescript // Create a collection const collection = createCollection( @@ -39,6 +48,33 @@ await addTx.commit() await addTx.isPersisted.promise ``` +## Example + +Experimental: wait for backend acknowledgement before resolving the transaction. + +```typescript +const onlineTransactor = new PowerSyncTransactor({ + database: db, + mode: TransactorMode.ONLINE, + timeoutMs: 30_000, +}) + +const confirmedTx = createTransaction({ + autoCommit: false, + mutationFn: async ({ transaction }) => { + await onlineTransactor.applyTransaction(transaction) + }, +}) + +confirmedTx.mutate(() => { + collection.insert({ id: randomUUID(), name: `confirmed-write` }) +}) + +await confirmedTx.commit() +await confirmedTx.isPersisted.promise +// At this point the mutation has been uploaded and synced back down. +``` + ## Param The transaction containing mutations to apply @@ -95,6 +131,10 @@ Defined in: [PowerSyncTransactor.ts:66](https://github.com/TanStack/db/blob/main Persists a Transaction to the PowerSync SQLite database. +The returned promise resolves according to the configured +[`TransactorMode`](../enumerations/TransactorMode.md): local observation in +offline mode, or backend upload plus sync-down observation in online mode. + #### Parameters ##### transaction diff --git a/docs/reference/powersync-db-collection/enumerations/TransactorMode.md b/docs/reference/powersync-db-collection/enumerations/TransactorMode.md new file mode 100644 index 000000000..20e30c1b7 --- /dev/null +++ b/docs/reference/powersync-db-collection/enumerations/TransactorMode.md @@ -0,0 +1,48 @@ +--- +id: TransactorMode +title: TransactorMode +--- + +# Enumeration: TransactorMode + +Controls when a [`PowerSyncTransactor`](../classes/PowerSyncTransactor.md) +considers a TanStack DB mutation transaction complete. + +## Enumeration Members + +### OFFLINE + +```ts +OFFLINE = "offline" +``` + +Resolve mutation transactions once the local PowerSync SQLite write has been +observed by TanStack DB. + +This is the default mode. It gives fast local-first behavior: mutations are +persisted locally, PowerSync can upload them later, and TanStack DB state is +already consistent with the local database when the transaction resolves. + +*** + +### ONLINE + +```ts +ONLINE = "online" +``` + +> Experimental: This mode depends on PowerSync checkpoint internals and may +> change as the PowerSync SDK exposes more direct backend-acknowledgement hooks. + +Resolve mutation transactions only after PowerSync has uploaded the local write +to the backend and the resulting change has been synced back down and observed +by TanStack DB. + +Use this mode when callers need backend confirmation before treating a mutation +as complete, such as when showing committed server state, navigating away from a +critical workflow, or coordinating with systems that only react after the backend +has accepted the write. + +Because this waits for a full upload and sync-down cycle, transactions can take +longer to resolve and may remain pending while the client is offline or the +PowerSync connection is unable to complete a checkpoint. diff --git a/docs/reference/powersync-db-collection/index.md b/docs/reference/powersync-db-collection/index.md index f6467856d..3c76f72d9 100644 --- a/docs/reference/powersync-db-collection/index.md +++ b/docs/reference/powersync-db-collection/index.md @@ -9,6 +9,10 @@ title: "@tanstack/powersync-db-collection" - [PowerSyncTransactor](classes/PowerSyncTransactor.md) +## Enumerations + +- [TransactorMode](enumerations/TransactorMode.md) + ## Type Aliases - [BasePowerSyncCollectionConfig](type-aliases/BasePowerSyncCollectionConfig.md) @@ -22,6 +26,8 @@ title: "@tanstack/powersync-db-collection" - [PowerSyncCollectionMeta](type-aliases/PowerSyncCollectionMeta.md) - [PowerSyncCollectionUtils](type-aliases/PowerSyncCollectionUtils.md) - [SerializerConfig](type-aliases/SerializerConfig.md) +- [OfflineTransactorOptions](type-aliases/OfflineTransactorOptions.md) +- [OnlineTransactorOptions](type-aliases/OnlineTransactorOptions.md) - [TransactorOptions](type-aliases/TransactorOptions.md) ## Variables diff --git a/docs/reference/powersync-db-collection/type-aliases/OfflineTransactorOptions.md b/docs/reference/powersync-db-collection/type-aliases/OfflineTransactorOptions.md new file mode 100644 index 000000000..ffd9983c9 --- /dev/null +++ b/docs/reference/powersync-db-collection/type-aliases/OfflineTransactorOptions.md @@ -0,0 +1,34 @@ +--- +id: OfflineTransactorOptions +title: OfflineTransactorOptions +--- + +# Type Alias: OfflineTransactorOptions + +```ts +type OfflineTransactorOptions = BaseTransactorOptions & object; +``` + +Options for local-first transaction handling. + +## Properties + +### database + +```ts +database: AbstractPowerSyncDatabase; +``` + +The PowerSync database that mutations will be written to. + +*** + +### mode? + +```ts +optional mode: TransactorMode.OFFLINE; +``` + +Resolve after the local write has been observed by TanStack DB. + +This is the default when `mode` is omitted. diff --git a/docs/reference/powersync-db-collection/type-aliases/OnlineTransactorOptions.md b/docs/reference/powersync-db-collection/type-aliases/OnlineTransactorOptions.md new file mode 100644 index 000000000..1bbe2b439 --- /dev/null +++ b/docs/reference/powersync-db-collection/type-aliases/OnlineTransactorOptions.md @@ -0,0 +1,70 @@ +--- +id: OnlineTransactorOptions +title: OnlineTransactorOptions +--- + +# Type Alias: OnlineTransactorOptions + +```ts +type OnlineTransactorOptions = BaseTransactorOptions & object; +``` + +Options for backend-confirmed transaction handling. + +> Experimental: Online transaction completion depends on PowerSync checkpoint +> internals and may change as the PowerSync SDK exposes more direct +> backend-acknowledgement hooks. + +## Properties + +### database + +```ts +database: AbstractPowerSyncDatabase; +``` + +The PowerSync database that mutations will be written to. + +*** + +### mode + +```ts +mode: TransactorMode.ONLINE; +``` + +Resolve after the local write has been uploaded to the backend, synced back down, +and observed by TanStack DB. + +> Experimental: This mode depends on PowerSync checkpoint internals and may +> change as the PowerSync SDK exposes more direct backend-acknowledgement hooks. + +*** + +### timeoutMs? + +```ts +optional timeoutMs: number; +``` + +Maximum total time to wait for the backend checkpoint and synced-down diff +records. + +If the timeout is reached before the write has completed the upload and sync-down +cycle, the mutation transaction rejects. + +> Experimental: This option only applies to the experimental online transaction +> mode. + +*** + +### abortSignal? + +```ts +optional abortSignal: AbortSignal; +``` + +Optional signal for cancelling the online wait. + +> Experimental: This option only applies to the experimental online transaction +> mode. diff --git a/docs/reference/powersync-db-collection/type-aliases/TransactorOptions.md b/docs/reference/powersync-db-collection/type-aliases/TransactorOptions.md index 7a9051b65..33c3366f8 100644 --- a/docs/reference/powersync-db-collection/type-aliases/TransactorOptions.md +++ b/docs/reference/powersync-db-collection/type-aliases/TransactorOptions.md @@ -6,17 +6,30 @@ title: TransactorOptions # Type Alias: TransactorOptions ```ts -type TransactorOptions = object; +type TransactorOptions = OfflineTransactorOptions | OnlineTransactorOptions; ``` -Defined in: [PowerSyncTransactor.ts:15](https://github.com/TanStack/db/blob/main/packages/powersync-db-collection/src/PowerSyncTransactor.ts#L15) +Configuration for a [`PowerSyncTransactor`](../classes/PowerSyncTransactor.md). -## Properties +`mode` is the discriminator: -### database +- omit `mode` or use [`TransactorMode.OFFLINE`](../enumerations/TransactorMode.md#offline) for fast local-first writes +- use [`TransactorMode.ONLINE`](../enumerations/TransactorMode.md#online) to unlock backend-confirmed wait options -```ts -database: AbstractPowerSyncDatabase; +## Example + +```typescript +new PowerSyncTransactor({ + database: db, +}) ``` -Defined in: [PowerSyncTransactor.ts:16](https://github.com/TanStack/db/blob/main/packages/powersync-db-collection/src/PowerSyncTransactor.ts#L16) +## Example + +```typescript +new PowerSyncTransactor({ + database: db, + mode: TransactorMode.ONLINE, + timeoutMs: 30_000, +}) +``` diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index ccf7cbb6e..3b9d1f313 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -29,4 +29,5 @@ export * from './query/expression-helpers.js' // Re-export some stuff explicitly to ensure the type & value is exported export type { Collection } from './collection/index.js' export { IR } +export { or } from './query/builder/functions.js' export { operators, type OperatorName } from './query/builder/functions.js' diff --git a/packages/powersync-db-collection/package.json b/packages/powersync-db-collection/package.json index c917ea5d1..12411cc50 100644 --- a/packages/powersync-db-collection/package.json +++ b/packages/powersync-db-collection/package.json @@ -62,8 +62,8 @@ "@powersync/common": "^1.41.0" }, "devDependencies": { - "@powersync/common": "0.0.0-dev-20260309101613", - "@powersync/node": "0.0.0-dev-20260309101613", + "@powersync/common": "^1.53.2", + "@powersync/node": "^0.18.7", "@types/debug": "^4.1.12", "@vitest/coverage-istanbul": "^3.2.4" } diff --git a/packages/powersync-db-collection/src/CheckpointObserver.ts b/packages/powersync-db-collection/src/CheckpointObserver.ts new file mode 100644 index 000000000..556278360 --- /dev/null +++ b/packages/powersync-db-collection/src/CheckpointObserver.ts @@ -0,0 +1,450 @@ +import type { AbstractPowerSyncDatabase } from '@powersync/common'; + +const CHECKPOINT_WATCH_TABLE = 'ps_local_checkpoint_watch'; + +type CheckpointValue = bigint | null; +type SerializedCheckpointValue = string | null; + +type CheckpointRecord = { + id: number; + recorded_at: string; + event: string; + old_target_op: CheckpointValue; + new_target_op: CheckpointValue; + old_last_op: CheckpointValue; + new_last_op: CheckpointValue; + old_last_applied_op: CheckpointValue; + new_last_applied_op: CheckpointValue; +}; + +type SerializedCheckpointRecord = Omit< + CheckpointRecord, + | 'old_target_op' + | 'new_target_op' + | 'old_last_op' + | 'new_last_op' + | 'old_last_applied_op' + | 'new_last_applied_op' +> & { + old_target_op: SerializedCheckpointValue; + new_target_op: SerializedCheckpointValue; + old_last_op: SerializedCheckpointValue; + new_last_op: SerializedCheckpointValue; + old_last_applied_op: SerializedCheckpointValue; + new_last_applied_op: SerializedCheckpointValue; +}; + +type PendingCheckpointTracker = { + target_op: CheckpointValue; + resolver: () => void; + rejector: (error: Error) => void; +}; + +const MAX_OP_ID = BigInt('9223372036854775807'); + +const isConcreteCheckpoint = (value: CheckpointValue): value is bigint => { + return value != null && value != MAX_OP_ID; +}; + +const checkpointApplied = (target: CheckpointValue, lastApplied: CheckpointValue): boolean => { + return target != null && lastApplied != null && target <= lastApplied; +}; + +const parseCheckpointValue = (value: SerializedCheckpointValue): CheckpointValue => { + return value == null ? null : BigInt(value); +}; + +const deserializeCheckpointRecord = ( + record: SerializedCheckpointRecord +): CheckpointRecord => ({ + ...record, + old_target_op: parseCheckpointValue(record.old_target_op), + new_target_op: parseCheckpointValue(record.new_target_op), + old_last_op: parseCheckpointValue(record.old_last_op), + new_last_op: parseCheckpointValue(record.new_last_op), + old_last_applied_op: parseCheckpointValue(record.old_last_applied_op), + new_last_applied_op: parseCheckpointValue(record.new_last_applied_op), +}); + +/** + * Observes and monitors the PowerSync write checkpoint state. + * Note: this uses internals which might be subject to change. + * Note: This is a sneaky and lazy method for obtaining write checkpoint updates + * without actually implementing the necessary logic/hooks in the PowerSync SDK. + * We might be able to replace this with a single watched query for ps_buckets - the logic here + * just helped a lot with debugging. + */ +export class CheckpointObserver { + readonly db: AbstractPowerSyncDatabase; + + protected disposeWatch: (() => void) | null; + + protected disposeDatabaseListener: (() => void) | null; + + protected pendingCheckpoints: Array; + + protected disposed: boolean; + + constructor(params: { db: AbstractPowerSyncDatabase }) { + this.db = params.db; + this.disposeWatch = null; + this.disposeDatabaseListener = this.db.registerListener({ + closing: () => this.dispose(), + closed: () => this.dispose() + }); + this.pendingCheckpoints = []; + this.disposed = false; + } + + /** + * Wait for a write checkpoint to be synced and applied locally. + * Uses the next set target write checkpoint if no `target` is provided. + */ + async waitForCheckpoint( + options: { + target?: bigint; + timeoutMs?: number; + abortSignal?: AbortSignal; + } = {} + ): Promise { + const { target = null, timeoutMs, abortSignal } = options; + // TODO, handle if target is provided and it's lower than the last applied_op + + let tracker!: PendingCheckpointTracker; + + const promise = new Promise((resolve, reject) => { + let complete = false; + + let timeout = timeoutMs + ? setTimeout(() => { + tracker.rejector(new Error(`Timeout reached`)); + }, timeoutMs) + : null; + + const onAbort = () => { + tracker.rejector(new Error('Aborted')); + }; + + if (abortSignal) { + abortSignal.addEventListener('abort', onAbort); + } + + const cleanup = () => { + const index = this.pendingCheckpoints.indexOf(tracker); + if (index != -1) { + this.pendingCheckpoints.splice(index, 1); + } + if (timeout) { + clearTimeout(timeout); + timeout = null; + } + if (abortSignal) { + abortSignal.removeEventListener('abort', onAbort); + } + }; + + tracker = { + target_op: target, + resolver: () => { + if (complete) { + return; + } + complete = true; + cleanup(); + resolve(); + }, + rejector: (error: Error) => { + if (complete) { + return; + } + complete = true; + cleanup(); + reject(error); + } + }; + }); + + this.pendingCheckpoints.push(tracker); + + if (this.disposed) { + tracker.rejector(new Error('Checkpoint observer disposed')); + } else if (abortSignal?.aborted) { + tracker.rejector(new Error('Aborted')); + } else { + try { + await this.processCurrentState(); + } catch (error) { + tracker.rejector(error instanceof Error ? error : new Error('Failed to read checkpoint state')); + } + } + + return promise; + } + + /** + * Initializes watches over the internal PowerSync checkpoint state. + * We don't have super great observability for changes made here, so, + * we work around that by using SQLite triggers to capture value changes to + * the `$local` bucket in `ps_buckets`. + * + * TODO: This trigger-based implementation might be overkill. It treats the SDK + * as a black box and records every relevant internal state change so we don't + * miss anything. We could probably simplify this if the SDK exposed a direct + * checkpoint hook or helper. + */ + async init() { + if (this.isDisposedOrClosed()) { + return; + } + + this.disposed = false; + + await this.db.writeLock(async (ctx) => { + if (this.disposed) { + return; + } + + /** + * A table to buffer updates made to the local write checkpoint + * state. The old values are not required for the completion logic, but + * keeping them makes the buffered checkpoint transitions easier to inspect. + */ + await ctx.execute(/* sql */ ` + CREATE TEMP TABLE ${CHECKPOINT_WATCH_TABLE} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + recorded_at TEXT NOT NULL DEFAULT (strftime ('%Y-%m-%dT%H:%M:%fZ', 'now')), + event TEXT NOT NULL, + old_target_op INTEGER, + new_target_op INTEGER, + old_last_op INTEGER, + new_last_op INTEGER, + old_last_applied_op INTEGER, + new_last_applied_op INTEGER + ); + `); + + await ctx.execute(/* sql */ ` + CREATE TEMP TRIGGER ps_watch_local_bucket_insert AFTER INSERT ON main.ps_buckets WHEN NEW.name = '$local' BEGIN + INSERT INTO + ${CHECKPOINT_WATCH_TABLE} ( + event, + old_target_op, + new_target_op, + old_last_op, + new_last_op, + old_last_applied_op, + new_last_applied_op + ) + VALUES + ( + 'insert', + NULL, + NEW.target_op, + NULL, + NEW.last_op, + NULL, + NEW.last_applied_op + ); + + END; + `); + + await ctx.execute(/* sql */ ` + CREATE TEMP TRIGGER ps_watch_local_bucket_update AFTER + UPDATE ON main.ps_buckets WHEN NEW.name = '$local' + AND ( + OLD.target_op IS NOT NEW.target_op + OR OLD.last_op IS NOT NEW.last_op + OR OLD.last_applied_op IS NOT NEW.last_applied_op + ) BEGIN + INSERT INTO + ps_local_checkpoint_watch ( + event, + old_target_op, + new_target_op, + old_last_op, + new_last_op, + old_last_applied_op, + new_last_applied_op + ) + VALUES + ( + 'update', + OLD.target_op, + NEW.target_op, + OLD.last_op, + NEW.last_op, + OLD.last_applied_op, + NEW.last_applied_op + ); + + END; + `); + + await ctx.execute(/* sql */ ` + CREATE TEMP TRIGGER ps_watch_local_bucket_delete AFTER DELETE ON main.ps_buckets WHEN OLD.name = '$local' BEGIN + INSERT INTO + ${CHECKPOINT_WATCH_TABLE} ( + event, + old_target_op, + new_target_op, + old_last_op, + new_last_op, + old_last_applied_op, + new_last_applied_op + ) + VALUES + ( + 'delete', + OLD.target_op, + NULL, + OLD.last_op, + NULL, + OLD.last_applied_op, + NULL + ); + + END; + `); + }); + + if (this.isDisposedOrClosed()) { + return; + } + + this.disposeWatch = this.db.onChangeWithCallback( + { + onChange: async () => { + if (this.disposed) { + return; + } + + // Needs to be a write lock in order to use temp tables (scoped to connection) + await this.db.writeLock(async (ctx) => { + if (this.disposed) { + return; + } + + // get the current tracked items + // node:sqlite does not reliably preserve large SQLite integers, so + // read checkpoint values as text and convert them to BigInt in JS. + const records = await ctx.getAll(/* sql */ ` + SELECT + id, + recorded_at, + event, + CAST(old_target_op AS TEXT) AS old_target_op, + CAST(new_target_op AS TEXT) AS new_target_op, + CAST(old_last_op AS TEXT) AS old_last_op, + CAST(new_last_op AS TEXT) AS new_last_op, + CAST(old_last_applied_op AS TEXT) AS old_last_applied_op, + CAST(new_last_applied_op AS TEXT) AS new_last_applied_op + FROM + ${CHECKPOINT_WATCH_TABLE} + ORDER BY + id ASC + `); + + this.processRecords(records.map(deserializeCheckpointRecord)); + + // delete the records, we've seen it all + await ctx.execute(`DELETE FROM ${CHECKPOINT_WATCH_TABLE}`); + }); + } + }, + { tables: [CHECKPOINT_WATCH_TABLE] } + ); + } + + async dispose() { + if (this.disposed) { + return; + } + + this.disposed = true; + this.disposeDatabaseListener?.(); + this.disposeDatabaseListener = null; + this.disposeWatch?.(); + this.disposeWatch = null; + + if (!this.db.closed) { + await this.db.writeLock(async (ctx) => { + await ctx.execute(/* sql */ `DROP TRIGGER IF EXISTS ps_watch_local_bucket_insert`); + await ctx.execute(/* sql */ `DROP TRIGGER IF EXISTS ps_watch_local_bucket_update`); + await ctx.execute(/* sql */ `DROP TRIGGER IF EXISTS ps_watch_local_bucket_delete`); + await ctx.execute(/* sql */ `DROP TABLE IF EXISTS ${CHECKPOINT_WATCH_TABLE}`); + }); + } + + // It's too late for any pending ops + for (const op of [...this.pendingCheckpoints]) { + op.rejector(new Error('Disposing watcher')); + } + this.pendingCheckpoints = []; + } + + protected async processCurrentState() { + await this.db.writeLock(async (ctx) => { + const row = await ctx.getOptional<{ + target_op: SerializedCheckpointValue; + last_applied_op: SerializedCheckpointValue; + }>(/* sql */ ` + SELECT + -- node:sqlite does not reliably preserve large SQLite integers, so + -- read checkpoint values as text and convert them to BigInt in JS. + CAST(target_op AS TEXT) AS target_op, + CAST(last_applied_op AS TEXT) AS last_applied_op + FROM + main.ps_buckets + WHERE + name = '$local' + `); + + if (!row) { + return; + } + + const targetOp = parseCheckpointValue(row.target_op); + const lastAppliedOp = parseCheckpointValue(row.last_applied_op); + + if (isConcreteCheckpoint(targetOp) && !checkpointApplied(targetOp, lastAppliedOp)) { + for (const pendingOp of this.pendingCheckpoints) { + if (pendingOp.target_op == null) { + pendingOp.target_op = targetOp; + } + } + } + + for (const pendingOp of [...this.pendingCheckpoints]) { + if (checkpointApplied(pendingOp.target_op, lastAppliedOp)) { + pendingOp.resolver(); + } + } + }); + } + + protected isDisposedOrClosed(): boolean { + return this.disposed || this.db.closed; + } + + protected processRecords(records: Array) { + for (const op of records) { + // If we have a concrete target op (write checkpoint) and pending requests which don't have a target yet + if (op.old_target_op != op.new_target_op && isConcreteCheckpoint(op.new_target_op)) { + // The target op just changed from a MAX to a concrete op, this value should be targeted + for (const pendingOp of this.pendingCheckpoints) { + if (pendingOp.target_op == null) { + // These wanted to wait for the next write checkpoint, we know this now. + pendingOp.target_op = op.new_target_op; + } + } + } + + for (const pendingOp of [...this.pendingCheckpoints]) { + if (checkpointApplied(pendingOp.target_op, op.new_last_applied_op)) { + pendingOp.resolver(); + } + } + } + } +} diff --git a/packages/powersync-db-collection/src/DiffObserver.ts b/packages/powersync-db-collection/src/DiffObserver.ts new file mode 100644 index 000000000..86e1123c3 --- /dev/null +++ b/packages/powersync-db-collection/src/DiffObserver.ts @@ -0,0 +1,80 @@ +import { BaseObserver } from '@powersync/common'; +import type { AbstractPowerSyncDatabase, LockContext } from '@powersync/common'; + +// TODO cleanup exports +export interface DiffObserver { + waitForEmpty: (context: LockContext, options?: {timeoutMs?: number, abortSignal?: AbortSignal}) => Promise +} + +type DiffObserverImplListener = { + empty: () => void; + closed: () => void; +} + +export class DiffObserverImpl extends BaseObserver implements DiffObserver { + readonly db: AbstractPowerSyncDatabase; + readonly diffTableName: string; + + constructor(options: {db: AbstractPowerSyncDatabase, diffTableName: string}) { + super(); + this.db = options.db; + this.diffTableName = options.diffTableName; + } + + dispose(): void { + this.iterateListeners(l => l.closed?.()); + } + + markEmpty() { + this.iterateListeners(l => l.empty?.()); + } + + async waitForEmpty(context: LockContext, options: {timeoutMs?: number, abortSignal?: AbortSignal} = {}): Promise { + // get a write lock to check the table state + const {count} = await context.get<{count: number}>(`SELECT COUNT(*) as count FROM ${this.diffTableName}`) + if (count == 0) { + // It's already empty + return; + } + + await new Promise((resolve, reject) => { + const {abortSignal, timeoutMs} = options; + let completed = false; + + const timeout = timeoutMs ? setTimeout(() => { + reject(new Error('Timeout while waiting for diff table to clear')); + }, timeoutMs) : null; + + const onAbort = () => { + completed = true; + reject(new Error('Aborted while waiting for diff table to clear')); + } + + abortSignal?.addEventListener('abort', onAbort); + + const cleanup = () => { + completed = true; + if (timeout) { + clearTimeout(timeout); + } + abortSignal?.removeEventListener('abort', onAbort); + dispose(); + } + // register a listener for when it's empty + const dispose = this.registerListener({ + closed: () => { + reject(new Error('Diff Observer closed')); + cleanup(); + }, + empty: () => { + if (completed) { + return; + } + cleanup(); + resolve() + } + }); + + }) + } +} diff --git a/packages/powersync-db-collection/src/PowerSyncTransactor.ts b/packages/powersync-db-collection/src/PowerSyncTransactor.ts index a45a4c751..9faf371ac 100644 --- a/packages/powersync-db-collection/src/PowerSyncTransactor.ts +++ b/packages/powersync-db-collection/src/PowerSyncTransactor.ts @@ -12,16 +12,119 @@ import type { const debug = DebugModule.debug(`ts/db:powersync`) -export type TransactorOptions = { +/** + * Controls when a {@link PowerSyncTransactor} considers a mutation transaction + * complete. + */ +export enum TransactorMode { + /** + * Resolve mutation transactions once the local PowerSync SQLite write has + * been observed by TanStack DB. + * + * This is the default mode. It gives fast local-first behavior: mutations are + * persisted locally, PowerSync can upload them later, and TanStack DB state is + * already consistent with the local database when the transaction resolves. + */ + OFFLINE = 'offline', + /** + * Resolve mutation transactions only after PowerSync has uploaded the local + * write to the backend and the resulting change has been synced back down and + * observed by TanStack DB. + * + * Use this mode when callers need backend confirmation before treating a + * mutation as complete, such as when showing committed server state, + * navigating away from a critical workflow, or coordinating with systems that + * only react after the backend has accepted the write. + * + * Because this waits for a full upload and sync-down cycle, transactions can + * take longer to resolve and may remain pending while the client is offline or + * the PowerSync connection is unable to complete a checkpoint. + * + * @experimental This mode depends on PowerSync checkpoint internals and may + * change as the PowerSync SDK exposes more direct backend-acknowledgement + * hooks. + */ + ONLINE = 'online' +} + +type BaseTransactorOptions = { + /** + * The PowerSync database that mutations will be written to. + */ database: AbstractPowerSyncDatabase } +/** + * Options for local-first transaction handling. + */ +export type OfflineTransactorOptions = BaseTransactorOptions & { + /** + * Resolve after the local write has been observed by TanStack DB. + * This is the default when `mode` is omitted. + */ + mode?: TransactorMode.OFFLINE +} + +/** + * Options for backend-confirmed transaction handling. + * + * @experimental Online transaction completion depends on PowerSync checkpoint + * internals and may change as the PowerSync SDK exposes more direct + * backend-acknowledgement hooks. + */ +export type OnlineTransactorOptions = BaseTransactorOptions & { + /** + * Resolve after the local write has been uploaded to the backend, synced back + * down, and observed by TanStack DB. + * + * @experimental This mode depends on PowerSync checkpoint internals and may + * change as the PowerSync SDK exposes more direct backend-acknowledgement + * hooks. + */ + mode: TransactorMode.ONLINE + /** + * Maximum total time to wait for the backend checkpoint and synced-down diff + * records. + * + * If the timeout is reached before the write has completed the upload and + * sync-down cycle, the mutation transaction rejects. + * + * @experimental This option only applies to the experimental online + * transaction mode. + */ + timeoutMs?: number + /** + * Optional signal for cancelling the online wait. + * + * @experimental This option only applies to the experimental online + * transaction mode. + */ + abortSignal?: AbortSignal +} + +/** + * Configuration for {@link PowerSyncTransactor}. + * + * `mode` is the discriminator: + * - omit `mode` or use `TransactorMode.OFFLINE` for fast local-first writes + * - use `TransactorMode.ONLINE` to unlock backend-confirmed wait options + */ +export type TransactorOptions = OfflineTransactorOptions | OnlineTransactorOptions + /** * Applies mutations to the PowerSync database. This method is called automatically by the collection's * insert, update, and delete operations. You typically don't need to call this directly unless you * have special transaction requirements. * + * By default, transactions resolve in {@link TransactorMode.OFFLINE} mode after + * the local SQLite write has been observed by TanStack DB. For workflows that + * need server acknowledgement, the experimental + * {@link TransactorMode.ONLINE} mode waits for PowerSync to upload the mutation + * to the backend and sync the accepted change back down before resolving. + * * @example + * Local-first transaction handling. + * * ```typescript * // Create a collection * const collection = createCollection( @@ -48,16 +151,56 @@ export type TransactorOptions = { * await addTx.isPersisted.promise * ``` * + * @example + * Experimental: wait for backend acknowledgement before resolving the + * transaction. + * + * ```typescript + * const onlineTransactor = new PowerSyncTransactor({ + * database: db, + * mode: TransactorMode.ONLINE, + * timeoutMs: 30_000, + * }) + * + * const confirmedTx = createTransaction({ + * autoCommit: false, + * mutationFn: async ({ transaction }) => { + * await onlineTransactor.applyTransaction(transaction) + * }, + * }) + * + * confirmedTx.mutate(() => { + * collection.insert({ id: randomUUID(), name: `confirmed-write` }) + * }) + * + * await confirmedTx.commit() + * await confirmedTx.isPersisted.promise + * // At this point the mutation has been uploaded and synced back down. + * ``` + * * @param transaction - The transaction containing mutations to apply - * @returns A promise that resolves when the mutations have been persisted to PowerSync + * @returns A promise that resolves according to the configured {@link TransactorMode}. */ export class PowerSyncTransactor { database: AbstractPowerSyncDatabase pendingOperationStore: PendingOperationStore + readonly mode: TransactorMode + protected readonly onlineOptions: Pick< + OnlineTransactorOptions, + 'abortSignal' | 'timeoutMs' + > | null constructor(options: TransactorOptions) { this.database = options.database this.pendingOperationStore = PendingOperationStore.GLOBAL + this.mode = options.mode ?? TransactorMode.OFFLINE + this.onlineOptions = + options.mode === TransactorMode.ONLINE + ? { + abortSignal: options.abortSignal, + timeoutMs: options.timeoutMs, + } + : null } /** @@ -129,19 +272,32 @@ export class PowerSyncTransactor { } } - /** - * Return a promise from the writeTransaction, without awaiting it. - * This promise will resolve once the entire transaction has been - * observed via the diff triggers. - * We return without awaiting in order to free the write lock. - */ - return { - whenComplete: Promise.all( - pendingOperations - .filter((op) => !!op) - .map((op) => this.pendingOperationStore.waitFor(op)), - ), + if (this.mode == TransactorMode.OFFLINE) { + /** + * Return a promise from the writeTransaction, without awaiting it. + * This promise will resolve once the entire transaction has been + * observed via the diff triggers. + * We return without awaiting in order to free the write lock. + */ + return { + whenComplete: Promise.all( + pendingOperations + .filter((op) => !!op) + .map((op) => this.pendingOperationStore.waitFor(op)), + ), + } + } else { + const meta = this.getMutationCollectionMeta(mutations[0]!); + /** + * Resolve after the backend has accepted the write checkpoint and + * TanStack DB has processed the resulting synced-down diff records. + */ + return { + whenComplete: this.waitForOnlineCompletion(meta), + } } + + }, ) @@ -305,6 +461,51 @@ export class PowerSyncTransactor { ).utils.getMeta() } + /** + * Waits for PowerSync to upload the local write, receive the accepted change + * back from the backend, and drain the resulting TanStack DB diff records. + */ + protected async waitForOnlineCompletion( + meta: PowerSyncCollectionMeta, + ): Promise { + const { abortSignal, timeoutMs } = this.onlineOptions ?? {} + + if (timeoutMs == null) { + const options = { abortSignal } + await meta.internal.checkpointObserver.waitForCheckpoint(options) + await this.database.writeLock((ctx) => + meta.internal.diffObserver.waitForEmpty(ctx, options), + ) + return + } + + const deadlineController = new AbortController() + const timeout = setTimeout(() => { + deadlineController.abort() + }, timeoutMs) + + const onAbort = () => { + deadlineController.abort() + } + + abortSignal?.addEventListener('abort', onAbort) + + try { + if (abortSignal?.aborted) { + deadlineController.abort() + } + + const options = { abortSignal: deadlineController.signal } + await meta.internal.checkpointObserver.waitForCheckpoint(options) + await this.database.writeLock((ctx) => + meta.internal.diffObserver.waitForEmpty(ctx, options), + ) + } finally { + clearTimeout(timeout) + abortSignal?.removeEventListener('abort', onAbort) + } + } + /** * Processes collection mutation metadata for persistence to the database. * We only support storing string metadata. diff --git a/packages/powersync-db-collection/src/definitions.ts b/packages/powersync-db-collection/src/definitions.ts index 9c63864cd..0c0db266a 100644 --- a/packages/powersync-db-collection/src/definitions.ts +++ b/packages/powersync-db-collection/src/definitions.ts @@ -1,3 +1,4 @@ +import type { CheckpointObserver } from './CheckpointObserver' import type { AbstractPowerSyncDatabase, Table } from '@powersync/common' import type { StandardSchemaV1 } from '@standard-schema/spec' import type { @@ -7,6 +8,7 @@ import type { InferSchemaOutput, LoadSubsetOptions, } from '@tanstack/db' +import type { DiffObserver } from './DiffObserver' import type { AnyTableColumnType, ExtractedTable, @@ -289,6 +291,11 @@ export type PowerSyncCollectionMeta = { * Whether the PowerSync table tracks metadata. */ metadataIsTracked: boolean + + internal: { + checkpointObserver: CheckpointObserver, + diffObserver: DiffObserver + } } /** diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index c11c1d269..3be2ddb16 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -1,18 +1,15 @@ import { DiffTriggerOperation, sanitizeSQL } from '@powersync/common' import { or } from '@tanstack/db' -import { compileSQLite } from './sqlite-compiler' -import { PendingOperationStore } from './PendingOperationStore' -import { PowerSyncTransactor } from './PowerSyncTransactor' +import { CheckpointObserver } from './CheckpointObserver' import { DEFAULT_BATCH_SIZE } from './definitions' +import { DiffObserverImpl } from './DiffObserver' import { asPowerSyncRecord, mapOperation } from './helpers' +import { PendingOperationStore } from './PendingOperationStore' +import { PowerSyncTransactor } from './PowerSyncTransactor' import { convertTableToSchema } from './schema' import { serializeForSQLite } from './serialization' -import type { - CleanupFn, - LoadSubsetOptions, - OperationType, - SyncConfig, -} from '@tanstack/db' +import { compileSQLite } from './sqlite-compiler' +import type { PendingOperation } from './PendingOperationStore' import type { AnyTableColumnType, ExtractedTable, @@ -31,9 +28,21 @@ import type { PowerSyncCollectionConfig, PowerSyncCollectionUtils, } from './definitions' -import type { PendingOperation } from './PendingOperationStore' +import type { + CleanupFn, + LoadSubsetOptions, + OperationType, + SyncConfig, +} from '@tanstack/db' import type { StandardSchemaV1 } from '@standard-schema/spec' -import type { LockContext, Table, TriggerDiffRecord } from '@powersync/common' +import type { AbstractPowerSyncDatabase, LockContext, Table, TriggerDiffRecord } from '@powersync/common' + + +// TODO, finalization registry +const CHECKPOINT_OBSERVER_MAP = new WeakMap< + AbstractPowerSyncDatabase, + CheckpointObserver +>() /** * Creates PowerSync collection options for use with a standard Collection. @@ -293,10 +302,26 @@ export function powerSyncCollectionOptions< .toString(16) .padStart(8, `0`)}` + const diffObserver = new DiffObserverImpl({ + db: database, + diffTableName: trackedTableName + }) + const transactor = new PowerSyncTransactor({ database, }) + if (!CHECKPOINT_OBSERVER_MAP.has(database)) { + const observer = new CheckpointObserver({ + db: database, + }) + observer.init() // TODO, better lifecycles + CHECKPOINT_OBSERVER_MAP.set(database, observer) + } + + const checkpointObserver = CHECKPOINT_OBSERVER_MAP.get(database)! + + /** * "sync" * Notice that this describes the Sync between the local SQLite table @@ -438,6 +463,8 @@ export function powerSyncCollectionOptions< { onChange: async () => { await flushDiffRecords() + // Emit that we've processed all changes up till now. + diffObserver.markEmpty(); }, }, { @@ -693,6 +720,10 @@ export function powerSyncCollectionOptions< }, utils: { getMeta: () => ({ + internal: { + checkpointObserver, + diffObserver + }, tableName: viewName, trackedTableName, metadataIsTracked, diff --git a/packages/powersync-db-collection/tsconfig.json b/packages/powersync-db-collection/tsconfig.json index 7e586bab3..3d1d6b37d 100644 --- a/packages/powersync-db-collection/tsconfig.json +++ b/packages/powersync-db-collection/tsconfig.json @@ -16,5 +16,10 @@ } }, "include": ["src", "tests", "vite.config.ts"], - "exclude": ["node_modules", "dist"] + "exclude": ["node_modules", "dist"], + "references": [ + { + "path": "../db" + } + ] } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f54cb8a82..1ad91dc2e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -934,11 +934,11 @@ importers: version: 4.0.1 devDependencies: '@powersync/common': - specifier: ^1.44.0 - version: 1.44.0 + specifier: ^1.53.2 + version: 1.53.2 '@powersync/node': - specifier: ^0.15.1 - version: 0.15.1(@powersync/common@1.44.0) + specifier: ^0.18.7 + version: 0.18.7(@powersync/common@1.53.2) '@types/debug': specifier: ^4.1.12 version: 4.1.12 @@ -3743,13 +3743,13 @@ packages: '@polka/url@1.0.0-next.29': resolution: {integrity: sha512-wwQAWhWSuHaag8c4q/KN/vCoeOJYshAIvMQwD4GpSb3OiZklFfvAgmj0VCBBImRpuF/aFgIRzllXlVX93Jevww==} - '@powersync/common@1.44.0': - resolution: {integrity: sha512-vdjSKJzXtjnenOT4EFRcTpQQ8MFV8+0KA+sf7GBVEo7zy6cg7SpQSv5AjdyX/CivOolMosjsV6Xk3WP9e7djBw==} + '@powersync/common@1.53.2': + resolution: {integrity: sha512-y+GtaLL4imFIWOatErIAEozNZPcIkAcA24c2Mso124ZFStOEeP8LJTSMDxPhyK14mzSnBk2dSjrOAjNz7m2B8g==} - '@powersync/node@0.15.1': - resolution: {integrity: sha512-t/XSvWVt7vA2ZsCRl10ma/33ERwRa3qzpmmkZI/2lb3WMvPx8Wirj41VE8vHh7EQ7WEuhhkXSso4Hvoa4/KDdw==} + '@powersync/node@0.18.7': + resolution: {integrity: sha512-JA/ymX7rZmfYCWHPGUf65UNjLUtzlWiLgGsAQ5HaDJ8CQxN6fp/Beua7D100rn7cHvZcSBLAcZWvNbPnpjMdmQ==} peerDependencies: - '@powersync/common': ^1.44.0 + '@powersync/common': ^1.53.2 better-sqlite3: 12.x peerDependenciesMeta: better-sqlite3: @@ -5411,9 +5411,6 @@ packages: async-limiter@1.0.1: resolution: {integrity: sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==} - async-mutex@0.5.0: - resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==} - available-typed-arrays@1.0.7: resolution: {integrity: sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ==} engines: {node: '>= 0.4'} @@ -7597,6 +7594,9 @@ packages: js-base64@3.7.8: resolution: {integrity: sha512-hNngCeKxIUQiEUN3GPJOkz4wF/YvdUdbNL9hsBcMQTkKzboD7T/q3OYOuuPZLUE6dBxSGpwhk5mwuDud7JVAow==} + js-logger@1.6.1: + resolution: {integrity: sha512-yTgMCPXVjhmg28CuUH8CKjU+cIKL/G+zTu4Fn4lQxs8mRFH/03QTNvEFngcxfg/gRDiQAOoyCKmMTOm9ayOzXA==} + js-tokens@4.0.0: resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==} @@ -13594,16 +13594,14 @@ snapshots: '@polka/url@1.0.0-next.29': {} - '@powersync/common@1.44.0': + '@powersync/common@1.53.2': dependencies: - async-mutex: 0.5.0 event-iterator: 2.0.0 + js-logger: 1.6.1 - '@powersync/node@0.15.1(@powersync/common@1.44.0)': + '@powersync/node@0.18.7(@powersync/common@1.53.2)': dependencies: - '@powersync/common': 1.44.0 - async-mutex: 0.5.0 - bson: 6.10.4 + '@powersync/common': 1.53.2 comlink: 4.4.2 undici: 7.16.0 @@ -15608,10 +15606,6 @@ snapshots: async-limiter@1.0.1: {} - async-mutex@0.5.0: - dependencies: - tslib: 2.8.1 - available-typed-arrays@1.0.7: dependencies: possible-typed-array-names: 1.1.0 @@ -18110,6 +18104,8 @@ snapshots: js-base64@3.7.8: {} + js-logger@1.6.1: {} + js-tokens@4.0.0: {} js-tokens@9.0.1: {}