Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
import { TodosDeserializationSchema, TodosSchema } from '@/library/powersync/TodosSchema';
import { CircularProgress } from '@mui/material';
import { PowerSyncContext } from '@powersync/react';
import { LogLevel, PowerSyncDatabase, createBaseLogger } from '@powersync/web';
import { createBaseLogger, LogLevel, PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web';
import { createCollection } from '@tanstack/db';
import { powerSyncCollectionOptions } from '@tanstack/powersync-db-collection';
import React, { Suspense } from 'react';
Expand All @@ -15,9 +15,10 @@ export const useSupabase = () => React.useContext(SupabaseContext);

export const db = new PowerSyncDatabase({
schema: AppSchema,
database: {
dbFilename: 'example.db'
}
database: new WASQLiteOpenFactory({
dbFilename: 'test2.sqlite',
vfs: WASQLiteVFS.OPFSCoopSyncVFS
})
});

export const listsCollection = createCollection(
Expand Down
20 changes: 16 additions & 4 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { CoreSyncStatus, coreStatusToJs } from './sync/stream/core-instruction.j
import { SyncStream } from './sync/sync-streams.js';
import { TriggerManager } from './triggers/TriggerManager.js';
import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';
import { MemoryTriggerHoldManager } from './watched/MemoryTriggerHoldManager.js';
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
import { WatchedQueryComparator } from './watched/processors/comparators.js';
Expand Down Expand Up @@ -222,6 +223,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Allows creating SQLite triggers which can be used to track various operations on SQLite tables.
*/
readonly triggers: TriggerManager;
protected triggersImpl: TriggerManagerImpl;

logger: ILogger;

Expand Down Expand Up @@ -296,10 +298,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

this._isReadyPromise = this.initialize();

this.triggers = new TriggerManagerImpl({
db: this,
schema: this.schema
});
this.triggers = this.triggersImpl = this.generateTriggerManager();
}

/**
Expand Down Expand Up @@ -334,6 +333,17 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
protected abstract openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter;

/**
* Generates a default trigger manager which uses a shared memory hold manager.
*/
protected generateTriggerManager() {
return new TriggerManagerImpl({
db: this,
schema: this.schema,
holdManager: new MemoryTriggerHoldManager()
});
}

protected abstract generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
options: CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions
Expand Down Expand Up @@ -420,6 +430,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.updateSchema(this.options.schema);
await this.resolveOfflineSyncStatus();
await this.database.execute('PRAGMA RECURSIVE_TRIGGERS=TRUE');
// Don't await this, we just need to trigger it
this.triggersImpl.cleanupStaleItems();
this.ready = true;
this.iterateListeners((cb) => cb.initialized?.());
}
Expand Down
37 changes: 31 additions & 6 deletions packages/common/src/client/triggers/TriggerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ export interface BaseTriggerDiffRecord<TOperationId extends string | number = nu
* This record contains the new value and optionally the previous value.
* Values are stored as JSON strings.
*/
export interface TriggerDiffUpdateRecord<TOperationId extends string | number = number>
extends BaseTriggerDiffRecord<TOperationId> {
export interface TriggerDiffUpdateRecord<
TOperationId extends string | number = number
> extends BaseTriggerDiffRecord<TOperationId> {
operation: DiffTriggerOperation.UPDATE;
/**
* The updated state of the row in JSON string format.
Expand All @@ -65,8 +66,9 @@ export interface TriggerDiffUpdateRecord<TOperationId extends string | number =
* Represents a diff record for a SQLite INSERT operation.
* This record contains the new value represented as a JSON string.
*/
export interface TriggerDiffInsertRecord<TOperationId extends string | number = number>
extends BaseTriggerDiffRecord<TOperationId> {
export interface TriggerDiffInsertRecord<
TOperationId extends string | number = number
> extends BaseTriggerDiffRecord<TOperationId> {
operation: DiffTriggerOperation.INSERT;
/**
* The value of the row, at the time of INSERT, in JSON string format.
Expand All @@ -79,8 +81,9 @@ export interface TriggerDiffInsertRecord<TOperationId extends string | number =
* Represents a diff record for a SQLite DELETE operation.
* This record contains the new value represented as a JSON string.
*/
export interface TriggerDiffDeleteRecord<TOperationId extends string | number = number>
extends BaseTriggerDiffRecord<TOperationId> {
export interface TriggerDiffDeleteRecord<
TOperationId extends string | number = number
> extends BaseTriggerDiffRecord<TOperationId> {
operation: DiffTriggerOperation.DELETE;
/**
* The value of the row, before the DELETE operation, in JSON string format.
Expand Down Expand Up @@ -201,6 +204,12 @@ interface BaseCreateDiffTriggerOptions {
* Hooks which allow execution during the trigger creation process.
*/
hooks?: TriggerCreationHooks;

/**
* Using persistence will result in creating a persisted SQLite trigger
* and destination table.
*/
usePersistence?: boolean;
}

/**
Expand Down Expand Up @@ -449,3 +458,19 @@ export interface TriggerManager {
*/
trackTableDiff(options: TrackDiffOptions): Promise<TriggerRemoveCallback>;
}

/**
* @experimental
*/

export interface TriggerHoldManager {
/**
* Obtains or marks a hold on a certain identifier.
* @returns a callback to release the hold.
*/
obtainHold: (identifier: string) => Promise<() => Promise<void>>;
/**
* Checks if a hold is present for an identifier.
*/
checkHold: (identifier: string) => Promise<boolean>;
}
152 changes: 147 additions & 5 deletions packages/common/src/client/triggers/TriggerManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
CreateDiffTriggerOptions,
DiffTriggerOperation,
TrackDiffOptions,
TriggerHoldManager,
TriggerManager,
TriggerRemoveCallback,
WithDiffOptions
Expand All @@ -14,8 +15,31 @@ import {
export type TriggerManagerImplOptions = {
db: AbstractPowerSyncDatabase;
schema: Schema;
holdManager: TriggerHoldManager;
usePersistenceByDefault?: () => Promise<boolean>;
};

/**
* A record of persisted table/trigger information.
* This is used for fail-safe cleanup.
*/
type TrackedTableRecord = {
/**
* The id of the trigger. This is used in the SQLite trigger name
*/
id: string;
/**
* The destination table name for the trigger
*/
table: string;
};

const TRIGGER_TABLE_TRACKING_KEY = 'powersync_tables_to_cleanup';

/**
* @internal
* @experimental
*/
export class TriggerManagerImpl implements TriggerManager {
protected schema: Schema;

Expand Down Expand Up @@ -48,14 +72,74 @@ export class TriggerManagerImpl implements TriggerManager {
}
}

async cleanupStaleItems() {
await this.db.writeLock(async (ctx) => {
const storedRecords = await ctx.getOptional<{ value: string }>(
/* sql */ `
SELECT
value
FROM
ps_kv
WHERE
key = ?
`,
[TRIGGER_TABLE_TRACKING_KEY]
);
if (!storedRecords) {
// There is nothing to cleanup
return;
}
const trackedItems = JSON.parse(storedRecords.value) as TrackedTableRecord[];
if (trackedItems.length == 0) {
// There is nothing to cleanup
return;
}

for (const trackedItem of trackedItems) {
// check if there is anything holding on to this item
const hasHold = await this.options.holdManager.checkHold(trackedItem.id);
if (hasHold) {
// This does not require cleanup
continue;
}

// We need to delete the table and triggers
const triggerNames = Object.values(DiffTriggerOperation).map(
(value) => `ps_temp_trigger_${value.toLowerCase()}_${trackedItem.id}`
);
for (const triggerName of triggerNames) {
// The trigger might not actually exist, we don't track each trigger name and we test all permutations
await ctx.execute(`DROP TRIGGER IF EXISTS ${triggerName}`);
}
await ctx.execute(`DROP TABLE IF EXISTS ${trackedItem.table}`);
}
});
}

async createDiffTrigger(options: CreateDiffTriggerOptions) {
await this.db.waitForReady();
const { source, destination, columns, when, hooks } = options;
const {
source,
destination,
columns,
when,
hooks,
// Fall back to the provided default if not given on this level
usePersistence = await this.options.usePersistenceByDefault?.()
} = options;
const operations = Object.keys(when) as DiffTriggerOperation[];
if (operations.length == 0) {
throw new Error('At least one WHEN operation must be specified for the trigger.');
}

/**
* The clause to use when executing
* CREATE ${tableTriggerTypeClause} TABLE
* OR
* CREATE ${tableTriggerTypeClause} TRIGGER
*/
const tableTriggerTypeClause = !usePersistence ? 'TEMP' : '';

const whenClauses = Object.fromEntries(
Object.entries(when).map(([operation, filter]) => [operation, `WHEN ${filter}`])
);
Expand All @@ -76,6 +160,8 @@ export class TriggerManagerImpl implements TriggerManager {

const id = await this.getUUID();

const releasePersistenceHold = usePersistence ? await this.options.holdManager.obtainHold(id) : null;

/**
* We default to replicating all columns if no columns array is provided.
*/
Expand Down Expand Up @@ -110,14 +196,47 @@ export class TriggerManagerImpl implements TriggerManager {
return this.db.writeLock(async (tx) => {
await this.removeTriggers(tx, triggerIds);
await tx.execute(/* sql */ `DROP TABLE IF EXISTS ${destination};`);
if (usePersistence) {
// Remove these triggers and tables from the list of items to safeguard cleanup for.
await tx.execute(
/* sql */ `
UPDATE ps_kv
SET
value = (
SELECT
json_group_array (json_each.value)
FROM
json_each (value)
WHERE
json_extract (json_each.value, '$.id') != ?
)
WHERE
key = ?;
`,
[id, TRIGGER_TABLE_TRACKING_KEY]
);

// Remove the key when the array becomes empty
await tx.execute(
/* sql */ `
DELETE FROM ps_kv
WHERE
key = ?
AND value IS NULL;
`,
[TRIGGER_TABLE_TRACKING_KEY]
);
}

await releasePersistenceHold?.();
});
};

const setup = async (tx: LockContext) => {
// Allow user code to execute in this lock context before the trigger is created.
await hooks?.beforeCreate?.(tx);
await tx.execute(/* sql */ `
CREATE TEMP TABLE ${destination} (
CREATE ${tableTriggerTypeClause} TABLE ${destination} (
operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT,
operation TEXT,
Expand All @@ -127,12 +246,35 @@ export class TriggerManagerImpl implements TriggerManager {
);
`);

if (usePersistence) {
/**
* Register the table for cleanup management
* Store objects of the form { id: string, table: string } in the JSON array.
*/
await tx.execute(
/* sql */ `
INSERT INTO
ps_kv (key, value)
VALUES
(?, json_array (json_object ('id', ?, 'table', ?))) ON CONFLICT (key) DO
UPDATE
SET
value = json_insert (
value,
'$[' || json_array_length (value) || ']',
json_object ('id', ?, 'table', ?)
);
`,
[TRIGGER_TABLE_TRACKING_KEY, id, destination, id, destination]
);
}

if (operations.includes(DiffTriggerOperation.INSERT)) {
const insertTriggerId = `ps_temp_trigger_insert_${id}`;
triggerIds.push(insertTriggerId);

await tx.execute(/* sql */ `
CREATE TEMP TRIGGER ${insertTriggerId} AFTER INSERT ON ${internalSource} ${whenClauses[
CREATE ${tableTriggerTypeClause} TRIGGER ${insertTriggerId} AFTER INSERT ON ${internalSource} ${whenClauses[
DiffTriggerOperation.INSERT
]} BEGIN
INSERT INTO
Expand All @@ -154,7 +296,7 @@ export class TriggerManagerImpl implements TriggerManager {
triggerIds.push(updateTriggerId);

await tx.execute(/* sql */ `
CREATE TEMP TRIGGER ${updateTriggerId} AFTER
CREATE ${tableTriggerTypeClause} TRIGGER ${updateTriggerId} AFTER
UPDATE ON ${internalSource} ${whenClauses[DiffTriggerOperation.UPDATE]} BEGIN
INSERT INTO
${destination} (id, operation, timestamp, value, previous_value)
Expand All @@ -177,7 +319,7 @@ export class TriggerManagerImpl implements TriggerManager {

// Create delete trigger for basic JSON
await tx.execute(/* sql */ `
CREATE TEMP TRIGGER ${deleteTriggerId} AFTER DELETE ON ${internalSource} ${whenClauses[
CREATE ${tableTriggerTypeClause} TRIGGER ${deleteTriggerId} AFTER DELETE ON ${internalSource} ${whenClauses[
DiffTriggerOperation.DELETE
]} BEGIN
INSERT INTO
Expand Down
22 changes: 22 additions & 0 deletions packages/common/src/client/watched/MemoryTriggerHoldManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { TriggerHoldManager } from '../triggers/TriggerManager.js';

export class MemoryTriggerHoldManager implements TriggerHoldManager {
// Uses a global store to share the state between potentially multiple instances
private static HOLD_STORE = new Map<string, () => Promise<void>>();

async obtainHold(identifier: string): Promise<() => Promise<void>> {
if (MemoryTriggerHoldManager.HOLD_STORE.has(identifier)) {
throw new Error(`A hold is already present for ${identifier}`);
}
const release = async () => {
MemoryTriggerHoldManager.HOLD_STORE.delete(identifier);
};
MemoryTriggerHoldManager.HOLD_STORE.set(identifier, release);

return release;
}

async checkHold(identifier: string): Promise<boolean> {
return MemoryTriggerHoldManager.HOLD_STORE.has(identifier);
}
}
Loading
Loading