Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/thick-items-train.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': minor
---

Add `WASQLiteVFS.OPFSWriteAheadVFS`, which also supports concurrent reads.
63 changes: 49 additions & 14 deletions packages/web/src/db/adapters/AsyncWebAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ type PendingListener = { listener: Partial<DBAdapterListener>; closeAfterRegiste
* A connection pool implementation delegating to another pool opened asynchronnously.
*/
class AsyncConnectionPool implements ConnectionPool {
protected readonly inner: Promise<DatabaseClient>;
protected readonly inner: Promise<PoolConnection>;

protected resolvedWriter?: DatabaseClient;
private activeOnWriter = 0;
private activeOnReader = 0;

protected resolvedClient?: DatabaseClient;
private readonly pendingListeners = new Set<PendingListener>();

constructor(
inner: Promise<DatabaseClient>,
inner: Promise<PoolConnection>,
readonly name: string
) {
this.inner = inner.then((client) => {
for (const pending of this.pendingListeners) {
pending.closeAfterRegisteredOnResolvedPool = client.registerListener(pending.listener);
pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener);
}
this.pendingListeners.clear();

this.resolvedClient = client;
this.resolvedWriter = client.writer;
return client;
});
}
Expand All @@ -40,26 +43,53 @@ class AsyncConnectionPool implements ConnectionPool {

async close() {
const inner = await this.inner;
return await inner.close();

await inner.writer.close();
await inner.additionalReader?.close();
}

async readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
const inner = await this.inner;
return await inner.readLock(fn, options);

// This is a crude load balancing scheme between the writer and an additional read connection (if available).
// Ideally, we should support abortable requests (which would allow us to request a lock from both and just use
// whatever completes first). For now, this at least gives us some concurrency. We can improve this in the future.
if (inner.additionalReader && this.activeOnReader <= this.activeOnWriter) {
try {
this.activeOnReader++;
return await inner.additionalReader.readLock(fn, options);
} finally {
this.activeOnReader--;
}
}

try {
this.activeOnWriter++;
return await inner.writer.readLock(fn, options);
} finally {
this.activeOnWriter--;
}
}

async writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
const inner = await this.inner;
return await inner.writeLock(fn, options);
try {
this.activeOnWriter++;
return await inner.writer.writeLock(fn, options);
} finally {
this.activeOnWriter--;
}
}

async refreshSchema(): Promise<void> {
await (await this.inner).refreshSchema();
const inner = await this.inner;
await inner.writer.refreshSchema();
await inner.additionalReader?.refreshSchema();
}

registerListener(listener: Partial<DBAdapterListener>): () => void {
if (this.resolvedClient) {
return this.resolvedClient.registerListener(listener);
if (this.resolvedWriter) {
return this.resolvedWriter.registerListener(listener);
} else {
const pending: PendingListener = { listener };
this.pendingListeners.add(pending);
Expand All @@ -75,15 +105,20 @@ class AsyncConnectionPool implements ConnectionPool {
}
}

export interface PoolConnection {
writer: DatabaseClient;
additionalReader?: DatabaseClient;
}

export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter {
async shareConnection(): Promise<SharedConnectionWorker> {
const inner = await this.inner;
return inner.shareConnection();
return inner.writer.shareConnection();
}

getConfiguration(): WebDBAdapterConfiguration {
if (this.resolvedClient) {
return this.resolvedClient.getConfiguration();
if (this.resolvedWriter) {
return this.resolvedWriter.getConfiguration();
}

throw new Error('AsyncDbAdapter.getConfiguration() can only be called after initializing it.');
Expand Down
6 changes: 4 additions & 2 deletions packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ export class DatabaseServer {
},
requestAccess: async (write, timeoutMs) => {
requireOpen();
// TODO: Support timeouts, they don't seem to be supported by the async-mutex package.
const lease = await this.#inner.acquireConnection();

const lease = await this.#inner.acquireConnection(
timeoutMs != null ? AbortSignal.timeout(timeoutMs) : undefined
);
if (!isOpen) {
// Race between requestAccess and close(), the connection was closed while we tried to acquire a lease.
await lease.returnLease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ export class RawSqliteConnection {

async init() {
const api = (this._sqliteAPI = await this.openSQLiteAPI());
this.db = await api.open_v2(this.options.dbFilename);
this.db = await api.open_v2(
this.options.dbFilename,
this.options.isReadOnly ? 1 /* SQLITE_OPEN_READONLY */ : 6 /* SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE */
);
await this.executeRaw(`PRAGMA temp_store = ${this.options.temporaryStorage};`);
if (this.options.encryptionKey) {
const escapedKey = this.options.encryptionKey.replace("'", "''");
Expand Down
120 changes: 76 additions & 44 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@ import {
import { SSRDBAdapter } from '../SSRDBAdapter.js';
import { vfsRequiresDedicatedWorkers, WASQLiteVFS } from './vfs.js';
import { MultiDatabaseServer } from '../../../worker/db/MultiDatabaseServer.js';
import { ClientOptions, DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js';
import { DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js';
import { generateTabCloseSignal } from '../../../shared/tab_close_signal.js';
import { AsyncDbAdapter } from '../AsyncWebAdapter.js';
import { AsyncDbAdapter, PoolConnection } from '../AsyncWebAdapter.js';

export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions {
vfs?: WASQLiteVFS;
}

export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions {
vfs: WASQLiteVFS;

/**
* Whether this is a read-only connection opened for the `OPFSWriteAheadVFS` file system.
*/
isReadOnly: boolean;
}

export interface WorkerDBOpenerOptions extends ResolvedWASQLiteOpenFactoryOptions {
Expand Down Expand Up @@ -82,7 +87,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
return this.openAdapter();
}

async openConnection(): Promise<DatabaseClient> {
async openConnection(): Promise<PoolConnection> {
const { enableMultiTabs, useWebWorker } = this.resolvedFlags;
const {
vfs = WASQLiteVFS.IDBBatchAtomicVFS,
Expand All @@ -95,70 +100,97 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
this.logger.warn('Multiple tabs are not enabled in this browser');
}

const resolvedOptions: ResolvedWASQLiteOpenFactoryOptions = {
const resolveOptions = (isReadOnly: boolean): ResolvedWASQLiteOpenFactoryOptions => ({
dbFilename: this.options.dbFilename,
dbLocation: this.options.dbLocation,
debugMode: this.options.debugMode,
vfs,
temporaryStorage,
cacheSizeKb,
flags: this.resolvedFlags,
encryptionKey: encryptionKey
};
encryptionKey: encryptionKey,
isReadOnly
});

let clientOptions: ClientOptions;
let client: DatabaseClient;
let additionalReader: DatabaseClient | undefined;
let requiresPersistentTriggers = vfsRequiresDedicatedWorkers(vfs);

if (useWebWorker) {
const optionsDbWorker = this.options.worker;

const workerPort =
typeof optionsDbWorker == 'function'
? resolveWorkerDatabasePortFactory(() =>
optionsDbWorker({
...this.options,
temporaryStorage,
cacheSizeKb,
flags: this.resolvedFlags,
encryptionKey
})
)
: openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs);

const source = Comlink.wrap<OpenWorkerConnection>(workerPort);
const closeSignal = new AbortController();
const connection = await source.connect({
...resolvedOptions,
logLevel: this.logger.getLevel(),
lockName: await generateTabCloseSignal(closeSignal.signal)
});
clientOptions = {
connection,
source,
// This tab owns the worker, so we're guaranteed to outlive it.
remoteCanCloseUnexpectedly: false,
onClose: () => {
closeSignal.abort();
if (workerPort instanceof Worker) {
workerPort.terminate();
} else {
workerPort.close();
const openDatabaseWorker = async (
resolvedOptions: ResolvedWASQLiteOpenFactoryOptions
): Promise<DatabaseClient> => {
const workerPort =
typeof optionsDbWorker == 'function'
? resolveWorkerDatabasePortFactory(() =>
optionsDbWorker({
...this.options,
temporaryStorage,
cacheSizeKb,
flags: this.resolvedFlags,
encryptionKey
})
)
: openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs);

const source = Comlink.wrap<OpenWorkerConnection>(workerPort);
const closeSignal = new AbortController();
const connection = await source.connect({
...resolvedOptions,
logLevel: this.logger.getLevel(),
lockName: await generateTabCloseSignal(closeSignal.signal)
});
const clientOptions = {
connection,
source,
// This tab owns the worker, so we're guaranteed to outlive it.
remoteCanCloseUnexpectedly: false,
onClose: () => {
closeSignal.abort();
if (workerPort instanceof Worker) {
workerPort.terminate();
} else {
workerPort.close();
}
}
}
};

return new DatabaseClient(clientOptions, {
...resolvedOptions,
requiresPersistentTriggers
});
};

client = await openDatabaseWorker(resolveOptions(false));

if (vfs == WASQLiteVFS.OPFSWriteAheadVFS) {
// This VFS supports concurrent reads, so we can open additional workers to host read-only connections for
// concurrent reads / writes. To avoid excessive resource usage, we currently add one additional reader per
// tab. In the future, we might revisit this to use a growable pool of readers.
additionalReader = await openDatabaseWorker(resolveOptions(true));
}
} else {
// Don't use a web worker. Instead, open the MultiDatabaseServer a worker would use locally.
const localServer = new MultiDatabaseServer(this.logger);
requiresPersistentTriggers = true;

const resolvedOptions = resolveOptions(false);
const connection = await localServer.openConnectionLocally(resolvedOptions);
clientOptions = { connection, source: null, remoteCanCloseUnexpectedly: false };
client = new DatabaseClient(
{ connection, source: null, remoteCanCloseUnexpectedly: false },
{
...resolvedOptions,
requiresPersistentTriggers
}
);
}

return new DatabaseClient(clientOptions, {
...resolvedOptions,
requiresPersistentTriggers
});
return {
writer: client,
additionalReader
};
}
}

Expand Down
Loading
Loading