Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 185 additions & 67 deletions modules/module-postgres-storage/src/storage/PostgresCompactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,83 @@ import { logger, ReplicationAssertionError } from '@powersync/lib-services-frame
import { InternalOpId, storage, utils } from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import * as t from 'ts-codec';
import { BIGINT_MAX } from '../types/codecs.js';
import { models } from '../types/types.js';
import { sql } from '../utils/db.js';
import { pick } from '../utils/ts-codec.js';
import { encodedCacheKey } from './batch/OperationBatch.js';

export type BucketFilter =
| { mode: 'all' }
| { mode: 'exact'; bucketName: string }
| { mode: 'prefix'; bucketPrefix: string };

export interface PaginationCursor {
bucket: string;
opId: bigint;
}

/**
* Validates that a bucket filter string is either:
* 1. A valid bucket definition name (no brackets) - e.g., "user", "global"
* 2. A valid full bucket name (name + JSON array) - e.g., "global[]", "user[\"u1\"]"
*/
export function isValidBucketNameOrPrefix(bucket: string): boolean {
if (bucket.length === 0) {
return false;
}

const bracketIndex = bucket.indexOf('[');

if (bracketIndex === -1) {
return true;
}

if (!bucket.endsWith(']')) {
return false;
}

const paramPart = bucket.slice(bracketIndex);

try {
const parsed = JSON.parse(paramPart);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using JSON.parse to avoid writing a custom parsing function, let me know what do you think.

return Array.isArray(parsed);
} catch {
return false;
}
}

export function createBucketFilter(bucket: string | undefined): BucketFilter {
if (bucket == null) {
return { mode: 'all' };
}

if (!isValidBucketNameOrPrefix(bucket)) {
throw new Error(
`Invalid bucket filter: "${bucket}". ` +
`Expected either a bucket definition name (e.g., "user") or ` +
`a full bucket name (e.g., "global[]" or "user[\\"u1\\"]").`
);
}

if (bucket.includes('[')) {
return { mode: 'exact', bucketName: bucket };
} else {
return { mode: 'prefix', bucketPrefix: `${bucket}[` };
}
}

const COMPACT_ROW_CODEC = pick(models.BucketData, [
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to global scope since it never changes.

'op',
'source_table',
'table_name',
'source_key',
'row_id',
'op_id',
'bucket_name'
]);

type CompactRow = t.Decoded<typeof COMPACT_ROW_CODEC>;

interface CurrentBucketState {
/** Bucket name */
bucket: string;
Expand Down Expand Up @@ -67,6 +138,113 @@ export class PostgresCompactor {
this.buckets = options?.compactBuckets;
}

private async fetchBatch(filter: BucketFilter, cursor: PaginationCursor | null): Promise<CompactRow[]> {
const groupId = { type: 'int4' as const, value: this.group_id };
const limit = { type: 'int4' as const, value: this.moveBatchQueryLimit };

if (filter.mode === 'all') {
if (cursor === null) {
return this.db.sql`
SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name
FROM bucket_data
WHERE group_id = ${groupId}
ORDER BY bucket_name DESC, op_id DESC
LIMIT ${limit}
`
.decoded(COMPACT_ROW_CODEC)
.rows();
} else {
return this.db.sql`
SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name
FROM bucket_data
WHERE group_id = ${groupId}
AND (bucket_name, op_id) < (
${{ type: 'varchar', value: cursor.bucket }},
${{ type: 'int8', value: cursor.opId }}
)
ORDER BY bucket_name DESC, op_id DESC
LIMIT ${limit}
`
.decoded(COMPACT_ROW_CODEC)
.rows();
}
} else if (filter.mode === 'exact') {
const bucketName = { type: 'varchar' as const, value: filter.bucketName };
if (cursor === null) {
return this.db.sql`
SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name
FROM bucket_data
WHERE group_id = ${groupId}
AND bucket_name = ${bucketName}
ORDER BY op_id DESC
LIMIT ${limit}
`
.decoded(COMPACT_ROW_CODEC)
.rows();
} else {
return this.db.sql`
SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name
FROM bucket_data
WHERE group_id = ${groupId}
AND bucket_name = ${bucketName}
AND op_id < ${{ type: 'int8', value: cursor.opId }}
ORDER BY op_id DESC
LIMIT ${limit}
`
.decoded(COMPACT_ROW_CODEC)
.rows();
}
} else {
// prefix mode
const likePattern = { type: 'varchar' as const, value: `${filter.bucketPrefix}%` };
if (cursor === null) {
return this.db.sql`
SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name
FROM bucket_data
WHERE group_id = ${groupId}
AND bucket_name LIKE ${likePattern}
ORDER BY bucket_name DESC, op_id DESC
LIMIT ${limit}
`
.decoded(COMPACT_ROW_CODEC)
.rows();
} else {
return this.db.sql`
SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name
FROM bucket_data
WHERE group_id = ${groupId}
AND bucket_name LIKE ${likePattern}
AND (bucket_name, op_id) < (
${{ type: 'varchar', value: cursor.bucket }},
${{ type: 'int8', value: cursor.opId }}
)
ORDER BY bucket_name DESC, op_id DESC
LIMIT ${limit}
`
.decoded(COMPACT_ROW_CODEC)
.rows();
}
}
}

private async *iterateBucketData(filter: BucketFilter): AsyncGenerator<CompactRow[]> {
let cursor: PaginationCursor | null = null;

while (true) {
const batch = await this.fetchBatch(filter, cursor);

if (batch.length === 0) {
break;
}

// Update cursor for next batch
const lastItem = batch[batch.length - 1];
cursor = { bucket: lastItem.bucket_name, opId: lastItem.op_id };

yield batch;
}
}

/**
* Compact buckets by converting operations into MOVE and/or CLEAR operations.
*
Expand All @@ -87,71 +265,11 @@ export class PostgresCompactor {

async compactInternal(bucket: string | undefined) {
const idLimitBytes = this.idLimitBytes;
const filter = createBucketFilter(bucket);

let currentState: CurrentBucketState | null = null;

let bucketLower: string | null = null;
let bucketUpper: string | null = null;
const MAX_CHAR = String.fromCodePoint(0xffff);

if (bucket == null) {
bucketLower = '';
bucketUpper = MAX_CHAR;
} else if (bucket?.includes('[')) {
// Exact bucket name
bucketLower = bucket;
bucketUpper = bucket;
} else if (bucket) {
// Bucket definition name
bucketLower = `${bucket}[`;
bucketUpper = `${bucket}[${MAX_CHAR}`;
}

let upperOpIdLimit = BIGINT_MAX;

while (true) {
const batch = await this.db.sql`
SELECT
op,
op_id,
source_table,
table_name,
row_id,
source_key,
bucket_name
FROM
bucket_data
WHERE
group_id = ${{ type: 'int4', value: this.group_id }}
AND bucket_name >= ${{ type: 'varchar', value: bucketLower }}
AND (
(
bucket_name = ${{ type: 'varchar', value: bucketUpper }}
AND op_id < ${{ type: 'int8', value: upperOpIdLimit }}
)
OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison
)
ORDER BY
bucket_name DESC,
op_id DESC
LIMIT
${{ type: 'int4', value: this.moveBatchQueryLimit }}
`
.decoded(
pick(models.BucketData, ['op', 'source_table', 'table_name', 'source_key', 'row_id', 'op_id', 'bucket_name'])
)
.rows();

if (batch.length == 0) {
// We've reached the end
break;
}

// Set upperBound for the next batch
const lastBatchItem = batch[batch.length - 1];
upperOpIdLimit = lastBatchItem.op_id;
bucketUpper = lastBatchItem.bucket_name;

for await (const batch of this.iterateBucketData(filter)) {
for (const doc of batch) {
if (currentState == null || doc.bucket_name != currentState.bucket) {
if (currentState != null && currentState.lastNotPut != null && currentState.opsSincePut >= 1) {
Expand All @@ -161,11 +279,11 @@ export class PostgresCompactor {
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);

const bucket = currentState.bucket;
const bucketName = currentState.bucket;
const clearOp = currentState.lastNotPut;
// Free memory before clearing bucket
currentState = null;
await this.clearBucket(bucket, clearOp);
await this.clearBucket(bucketName, clearOp);
}
currentState = {
bucket: doc.bucket_name,
Expand Down Expand Up @@ -242,11 +360,11 @@ export class PostgresCompactor {
logger.info(
`Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
const bucket = currentState.bucket;
const bucketName = currentState.bucket;
const clearOp = currentState.lastNotPut;
// Free memory before clearing bucket
currentState = null;
await this.clearBucket(bucket, clearOp);
await this.clearBucket(bucketName, clearOp);
}
}

Expand Down
Loading