From 033b1bc1191043f0b236d715ee9ae9e51a9b002d Mon Sep 17 00:00:00 2001 From: Jonathan Schweder Date: Tue, 23 Dec 2025 13:53:50 +0000 Subject: [PATCH] Optimize Postgres compactor queries --- .../src/storage/PostgresCompactor.ts | 252 +++++++--- .../test/src/compactor-bucket-filter.test.ts | 465 ++++++++++++++++++ .../compactor-create-bucket-filter.test.ts | 161 ++++++ .../test/src/compactor-pagination.test.ts | 386 +++++++++++++++ 4 files changed, 1197 insertions(+), 67 deletions(-) create mode 100644 modules/module-postgres-storage/test/src/compactor-bucket-filter.test.ts create mode 100644 modules/module-postgres-storage/test/src/compactor-create-bucket-filter.test.ts create mode 100644 modules/module-postgres-storage/test/src/compactor-pagination.test.ts diff --git a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts index b19d866eb..1b4ceabcb 100644 --- a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts +++ b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts @@ -3,12 +3,83 @@ import { logger, ReplicationAssertionError } from '@powersync/lib-services-frame import { InternalOpId, storage, utils } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as t from 'ts-codec'; -import { BIGINT_MAX } from '../types/codecs.js'; import { models } from '../types/types.js'; import { sql } from '../utils/db.js'; import { pick } from '../utils/ts-codec.js'; import { encodedCacheKey } from './batch/OperationBatch.js'; +export type BucketFilter = + | { mode: 'all' } + | { mode: 'exact'; bucketName: string } + | { mode: 'prefix'; bucketPrefix: string }; + +export interface PaginationCursor { + bucket: string; + opId: bigint; +} + +/** + * Validates that a bucket filter string is either: + * 1. A valid bucket definition name (no brackets) - e.g., "user", "global" + * 2. A valid full bucket name (name + JSON array) - e.g., "global[]", "user[\"u1\"]" + */ +export function isValidBucketNameOrPrefix(bucket: string): boolean { + if (bucket.length === 0) { + return false; + } + + const bracketIndex = bucket.indexOf('['); + + if (bracketIndex === -1) { + return true; + } + + if (!bucket.endsWith(']')) { + return false; + } + + const paramPart = bucket.slice(bracketIndex); + + try { + const parsed = JSON.parse(paramPart); + return Array.isArray(parsed); + } catch { + return false; + } +} + +export function createBucketFilter(bucket: string | undefined): BucketFilter { + if (bucket == null) { + return { mode: 'all' }; + } + + if (!isValidBucketNameOrPrefix(bucket)) { + throw new Error( + `Invalid bucket filter: "${bucket}". ` + + `Expected either a bucket definition name (e.g., "user") or ` + + `a full bucket name (e.g., "global[]" or "user[\\"u1\\"]").` + ); + } + + if (bucket.includes('[')) { + return { mode: 'exact', bucketName: bucket }; + } else { + return { mode: 'prefix', bucketPrefix: `${bucket}[` }; + } +} + +const COMPACT_ROW_CODEC = pick(models.BucketData, [ + 'op', + 'source_table', + 'table_name', + 'source_key', + 'row_id', + 'op_id', + 'bucket_name' +]); + +type CompactRow = t.Decoded; + interface CurrentBucketState { /** Bucket name */ bucket: string; @@ -67,6 +138,113 @@ export class PostgresCompactor { this.buckets = options?.compactBuckets; } + private async fetchBatch(filter: BucketFilter, cursor: PaginationCursor | null): Promise { + const groupId = { type: 'int4' as const, value: this.group_id }; + const limit = { type: 'int4' as const, value: this.moveBatchQueryLimit }; + + if (filter.mode === 'all') { + if (cursor === null) { + return this.db.sql` + SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name + FROM bucket_data + WHERE group_id = ${groupId} + ORDER BY bucket_name DESC, op_id DESC + LIMIT ${limit} + ` + .decoded(COMPACT_ROW_CODEC) + .rows(); + } else { + return this.db.sql` + SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name + FROM bucket_data + WHERE group_id = ${groupId} + AND (bucket_name, op_id) < ( + ${{ type: 'varchar', value: cursor.bucket }}, + ${{ type: 'int8', value: cursor.opId }} + ) + ORDER BY bucket_name DESC, op_id DESC + LIMIT ${limit} + ` + .decoded(COMPACT_ROW_CODEC) + .rows(); + } + } else if (filter.mode === 'exact') { + const bucketName = { type: 'varchar' as const, value: filter.bucketName }; + if (cursor === null) { + return this.db.sql` + SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name + FROM bucket_data + WHERE group_id = ${groupId} + AND bucket_name = ${bucketName} + ORDER BY op_id DESC + LIMIT ${limit} + ` + .decoded(COMPACT_ROW_CODEC) + .rows(); + } else { + return this.db.sql` + SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name + FROM bucket_data + WHERE group_id = ${groupId} + AND bucket_name = ${bucketName} + AND op_id < ${{ type: 'int8', value: cursor.opId }} + ORDER BY op_id DESC + LIMIT ${limit} + ` + .decoded(COMPACT_ROW_CODEC) + .rows(); + } + } else { + // prefix mode + const likePattern = { type: 'varchar' as const, value: `${filter.bucketPrefix}%` }; + if (cursor === null) { + return this.db.sql` + SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name + FROM bucket_data + WHERE group_id = ${groupId} + AND bucket_name LIKE ${likePattern} + ORDER BY bucket_name DESC, op_id DESC + LIMIT ${limit} + ` + .decoded(COMPACT_ROW_CODEC) + .rows(); + } else { + return this.db.sql` + SELECT op, op_id, source_table, table_name, row_id, source_key, bucket_name + FROM bucket_data + WHERE group_id = ${groupId} + AND bucket_name LIKE ${likePattern} + AND (bucket_name, op_id) < ( + ${{ type: 'varchar', value: cursor.bucket }}, + ${{ type: 'int8', value: cursor.opId }} + ) + ORDER BY bucket_name DESC, op_id DESC + LIMIT ${limit} + ` + .decoded(COMPACT_ROW_CODEC) + .rows(); + } + } + } + + private async *iterateBucketData(filter: BucketFilter): AsyncGenerator { + let cursor: PaginationCursor | null = null; + + while (true) { + const batch = await this.fetchBatch(filter, cursor); + + if (batch.length === 0) { + break; + } + + // Update cursor for next batch + const lastItem = batch[batch.length - 1]; + cursor = { bucket: lastItem.bucket_name, opId: lastItem.op_id }; + + yield batch; + } + } + /** * Compact buckets by converting operations into MOVE and/or CLEAR operations. * @@ -87,71 +265,11 @@ export class PostgresCompactor { async compactInternal(bucket: string | undefined) { const idLimitBytes = this.idLimitBytes; + const filter = createBucketFilter(bucket); let currentState: CurrentBucketState | null = null; - let bucketLower: string | null = null; - let bucketUpper: string | null = null; - const MAX_CHAR = String.fromCodePoint(0xffff); - - if (bucket == null) { - bucketLower = ''; - bucketUpper = MAX_CHAR; - } else if (bucket?.includes('[')) { - // Exact bucket name - bucketLower = bucket; - bucketUpper = bucket; - } else if (bucket) { - // Bucket definition name - bucketLower = `${bucket}[`; - bucketUpper = `${bucket}[${MAX_CHAR}`; - } - - let upperOpIdLimit = BIGINT_MAX; - - while (true) { - const batch = await this.db.sql` - SELECT - op, - op_id, - source_table, - table_name, - row_id, - source_key, - bucket_name - FROM - bucket_data - WHERE - group_id = ${{ type: 'int4', value: this.group_id }} - AND bucket_name >= ${{ type: 'varchar', value: bucketLower }} - AND ( - ( - bucket_name = ${{ type: 'varchar', value: bucketUpper }} - AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} - ) - OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison - ) - ORDER BY - bucket_name DESC, - op_id DESC - LIMIT - ${{ type: 'int4', value: this.moveBatchQueryLimit }} - ` - .decoded( - pick(models.BucketData, ['op', 'source_table', 'table_name', 'source_key', 'row_id', 'op_id', 'bucket_name']) - ) - .rows(); - - if (batch.length == 0) { - // We've reached the end - break; - } - - // Set upperBound for the next batch - const lastBatchItem = batch[batch.length - 1]; - upperOpIdLimit = lastBatchItem.op_id; - bucketUpper = lastBatchItem.bucket_name; - + for await (const batch of this.iterateBucketData(filter)) { for (const doc of batch) { if (currentState == null || doc.bucket_name != currentState.bucket) { if (currentState != null && currentState.lastNotPut != null && currentState.opsSincePut >= 1) { @@ -161,11 +279,11 @@ export class PostgresCompactor { `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` ); - const bucket = currentState.bucket; + const bucketName = currentState.bucket; const clearOp = currentState.lastNotPut; // Free memory before clearing bucket currentState = null; - await this.clearBucket(bucket, clearOp); + await this.clearBucket(bucketName, clearOp); } currentState = { bucket: doc.bucket_name, @@ -242,11 +360,11 @@ export class PostgresCompactor { logger.info( `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` ); - const bucket = currentState.bucket; + const bucketName = currentState.bucket; const clearOp = currentState.lastNotPut; // Free memory before clearing bucket currentState = null; - await this.clearBucket(bucket, clearOp); + await this.clearBucket(bucketName, clearOp); } } diff --git a/modules/module-postgres-storage/test/src/compactor-bucket-filter.test.ts b/modules/module-postgres-storage/test/src/compactor-bucket-filter.test.ts new file mode 100644 index 000000000..a6d62b3ad --- /dev/null +++ b/modules/module-postgres-storage/test/src/compactor-bucket-filter.test.ts @@ -0,0 +1,465 @@ +import { storage } from '@powersync/service-core'; +import { TEST_TABLE, test_utils } from '@powersync/service-core-tests'; +import { describe, expect, test } from 'vitest'; +import { POSTGRES_STORAGE_FACTORY } from './util.js'; + +/** + * Default compact options used across all bucket filtering tests. + * These values are intentionally small to make compaction predictable in tests. + */ +const DEFAULT_COMPACT_OPTIONS = { + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 1, + minBucketChanges: 1 +} as const; + +describe('Postgres Compactor - Bucket Filtering', () => { + + test('should compact only the specified exact bucket', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT * FROM test WHERE bucket = 'global' + other: + data: + - SELECT * FROM test WHERE bucket = 'other' + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 'g1', bucket: 'global' }, + afterReplicaId: test_utils.rid('g1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 'g1', bucket: 'global' }, + afterReplicaId: test_utils.rid('g1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 'o1', bucket: 'other' }, + afterReplicaId: test_utils.rid('o1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 'o1', bucket: 'other' }, + afterReplicaId: test_utils.rid('o1') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const globalBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + const otherBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['other[]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['global[]', 'other[]']); + + await bucketStorage.compact({ + ...DEFAULT_COMPACT_OPTIONS, + compactBuckets: ['global[]'] + }); + + const globalAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + const otherAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['other[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['global[]', 'other[]']); + expect(checksumsAfter.get('global[]')?.checksum, 'global[] checksum should be preserved').toEqual(checksumsBefore.get('global[]')?.checksum); + expect(checksumsAfter.get('other[]')?.checksum, 'other[] checksum should be preserved').toEqual(checksumsBefore.get('other[]')?.checksum); + + test_utils.validateCompactedBucket(globalBefore.chunkData.data, globalAfter.chunkData.data); + + expect(otherAfter.chunkData.data.every(op => op.op === 'PUT'), 'other[] should only have PUT operations (not compacted)').toBe(true); + expect(otherAfter.chunkData.data, 'other[] data should remain unchanged').toEqual(otherBefore.chunkData.data); + }); + + test('should compact all buckets matching a prefix', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + user: + parameters: SELECT id FROM users + data: + - SELECT * FROM test WHERE user_id = bucket.id + global: + data: + - SELECT * FROM test WHERE bucket = 'global' + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1', user_id: 'u1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1', user_id: 'u1' }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't2', user_id: 'u2' }, + afterReplicaId: test_utils.rid('t2') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't2', user_id: 'u2' }, + afterReplicaId: test_utils.rid('t2') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 'g1', bucket: 'global' }, + afterReplicaId: test_utils.rid('g1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 'g1', bucket: 'global' }, + afterReplicaId: test_utils.rid('g1') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const user1Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const user2Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u2"]', 0n]])) + ); + const globalBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['user["u1"]', 'user["u2"]', 'global[]']); + + await bucketStorage.compact({ + ...DEFAULT_COMPACT_OPTIONS, + compactBuckets: ['user'] + }); + + const user1After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const user2After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u2"]', 0n]])) + ); + const globalAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['user["u1"]', 'user["u2"]', 'global[]']); + expect(checksumsAfter.get('user["u1"]')?.checksum, 'user["u1"] checksum should be preserved').toEqual(checksumsBefore.get('user["u1"]')?.checksum); + expect(checksumsAfter.get('user["u2"]')?.checksum, 'user["u2"] checksum should be preserved').toEqual(checksumsBefore.get('user["u2"]')?.checksum); + expect(checksumsAfter.get('global[]')?.checksum, 'global[] checksum should be preserved').toEqual(checksumsBefore.get('global[]')?.checksum); + + test_utils.validateCompactedBucket(user1Before.chunkData.data, user1After.chunkData.data); + test_utils.validateCompactedBucket(user2Before.chunkData.data, user2After.chunkData.data); + + expect(globalAfter.chunkData.data, 'global[] data should remain unchanged (not matched by prefix)').toEqual(globalBefore.chunkData.data); + }); + + test('should compact a specific parameterized bucket by exact name', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + user: + parameters: SELECT id FROM users + data: + - SELECT * FROM test WHERE user_id = bucket.id + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1', user_id: 'u1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1', user_id: 'u1' }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't2', user_id: 'u2' }, + afterReplicaId: test_utils.rid('t2') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't2', user_id: 'u2' }, + afterReplicaId: test_utils.rid('t2') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const user1Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const user2Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u2"]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['user["u1"]', 'user["u2"]']); + + await bucketStorage.compact({ + ...DEFAULT_COMPACT_OPTIONS, + compactBuckets: ['user["u1"]'] + }); + + const user1After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const user2After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u2"]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['user["u1"]', 'user["u2"]']); + expect(checksumsAfter.get('user["u1"]')?.checksum, 'user["u1"] checksum should be preserved').toEqual(checksumsBefore.get('user["u1"]')?.checksum); + expect(checksumsAfter.get('user["u2"]')?.checksum, 'user["u2"] checksum should be preserved').toEqual(checksumsBefore.get('user["u2"]')?.checksum); + + test_utils.validateCompactedBucket(user1Before.chunkData.data, user1After.chunkData.data); + + expect(user2After.chunkData.data, 'user["u2"] data should remain unchanged (not targeted)').toEqual(user2Before.chunkData.data); + }); + + test('should not compact any buckets when filter matches nothing', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT * FROM test + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const globalBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + await bucketStorage.compact({ + ...DEFAULT_COMPACT_OPTIONS, + compactBuckets: ['nonexistent'] + }); + + const globalAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + expect(globalAfter.chunkData.data, 'global[] data should remain unchanged (nonexistent prefix)').toEqual(globalBefore.chunkData.data); + }); + + test('should not compact any buckets when compactBuckets is empty array', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT * FROM test + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const globalBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + await bucketStorage.compact({ + ...DEFAULT_COMPACT_OPTIONS, + compactBuckets: [] + }); + + const globalAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + expect(globalAfter.chunkData.data, 'global[] data should remain unchanged (empty compactBuckets array)').toEqual(globalBefore.chunkData.data); + }); + + test('should compact buckets using both exact match and prefix filters together', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT * FROM test WHERE bucket = 'global' + user: + parameters: SELECT id FROM users + data: + - SELECT * FROM test WHERE user_id = bucket.id + other: + data: + - SELECT * FROM test WHERE bucket = 'other' + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 'g1', bucket: 'global' }, + afterReplicaId: test_utils.rid('g1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 'g1', bucket: 'global' }, + afterReplicaId: test_utils.rid('g1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1', user_id: 'u1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1', user_id: 'u1' }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 'o1', bucket: 'other' }, + afterReplicaId: test_utils.rid('o1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 'o1', bucket: 'other' }, + afterReplicaId: test_utils.rid('o1') + }); + + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const globalBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + const user1Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const otherBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['other[]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['global[]', 'user["u1"]', 'other[]']); + + await bucketStorage.compact({ + ...DEFAULT_COMPACT_OPTIONS, + compactBuckets: ['global[]', 'user'] + }); + + const globalAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + const user1After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const otherAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['other[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['global[]', 'user["u1"]', 'other[]']); + expect(checksumsAfter.get('global[]')?.checksum, 'global[] checksum should be preserved').toEqual(checksumsBefore.get('global[]')?.checksum); + expect(checksumsAfter.get('user["u1"]')?.checksum, 'user["u1"] checksum should be preserved').toEqual(checksumsBefore.get('user["u1"]')?.checksum); + expect(checksumsAfter.get('other[]')?.checksum, 'other[] checksum should be preserved').toEqual(checksumsBefore.get('other[]')?.checksum); + + test_utils.validateCompactedBucket(globalBefore.chunkData.data, globalAfter.chunkData.data); + test_utils.validateCompactedBucket(user1Before.chunkData.data, user1After.chunkData.data); + + expect(otherAfter.chunkData.data, 'other[] data should remain unchanged (not in compactBuckets list)').toEqual(otherBefore.chunkData.data); + }); + +}); diff --git a/modules/module-postgres-storage/test/src/compactor-create-bucket-filter.test.ts b/modules/module-postgres-storage/test/src/compactor-create-bucket-filter.test.ts new file mode 100644 index 000000000..9af561407 --- /dev/null +++ b/modules/module-postgres-storage/test/src/compactor-create-bucket-filter.test.ts @@ -0,0 +1,161 @@ +import { describe, expect, test } from 'vitest'; +import { createBucketFilter, isValidBucketNameOrPrefix } from '../../src/storage/PostgresCompactor.js'; + +describe('createBucketFilter', () => { + test('undefined input returns all mode', () => { + const filter = createBucketFilter(undefined); + expect(filter).toEqual({ mode: 'all' }); + }); + + test('global bucket with empty brackets returns exact mode', () => { + const filter = createBucketFilter('global[]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'global[]' }); + }); + + test('parameterized bucket returns exact mode', () => { + const filter = createBucketFilter('user["user-123"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'user["user-123"]' }); + }); + + test('bucket with multiple parameters returns exact mode', () => { + const filter = createBucketFilter('workspace["ws-1","org-2"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'workspace["ws-1","org-2"]' }); + }); + + test('bucket with numeric parameter returns exact mode', () => { + const filter = createBucketFilter('item["12345"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'item["12345"]' }); + }); + + test('bucket with UUID parameter returns exact mode', () => { + const filter = createBucketFilter('session["550e8400-e29b-41d4-a716-446655440000"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'session["550e8400-e29b-41d4-a716-446655440000"]' }); + }); + + test('simple name returns prefix mode', () => { + const filter = createBucketFilter('user'); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: 'user[' }); + }); + + test('name with underscores returns prefix mode', () => { + const filter = createBucketFilter('by_user_org'); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: 'by_user_org[' }); + }); + + test('name with hyphens returns prefix mode', () => { + const filter = createBucketFilter('my-bucket'); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: 'my-bucket[' }); + }); + + test('single character name returns prefix mode', () => { + const filter = createBucketFilter('a'); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: 'a[' }); + }); + + test('name with numbers returns prefix mode', () => { + const filter = createBucketFilter('bucket123'); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: 'bucket123[' }); + }); + + test('bucket with escaped quotes in parameter', () => { + const filter = createBucketFilter('user["name\\"with\\"quotes"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'user["name\\"with\\"quotes"]' }); + }); + + test('bucket with special SQL characters', () => { + const filter = createBucketFilter("user[\"'; DROP TABLE users; --\"]"); + expect(filter).toEqual({ mode: 'exact', bucketName: "user[\"'; DROP TABLE users; --\"]" }); + }); + + test('bucket with unicode characters', () => { + const filter = createBucketFilter('用户["测试"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: '用户["测试"]' }); + }); + + test('very long bucket definition name', () => { + const longName = 'a'.repeat(1000); + const filter = createBucketFilter(longName); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: `${longName}[` }); + }); + + test('bucket with nested brackets', () => { + const filter = createBucketFilter('data["array[0]"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'data["array[0]"]' }); + }); + + test('name with whitespace returns prefix mode', () => { + const filter = createBucketFilter('my bucket'); + expect(filter).toEqual({ mode: 'prefix', bucketPrefix: 'my bucket[' }); + }); + + test('bucket with newline in parameter', () => { + const filter = createBucketFilter('user["line1\\nline2"]'); + expect(filter).toEqual({ mode: 'exact', bucketName: 'user["line1\\nline2"]' }); + }); + + test('throws on empty string', () => { + expect(() => createBucketFilter('')).toThrow(/Invalid bucket filter/); + }); + + test('throws on incomplete bracket', () => { + expect(() => createBucketFilter('user[')).toThrow(/Invalid bucket filter/); + }); + + test('throws on incomplete parameter', () => { + expect(() => createBucketFilter('user["')).toThrow(/Invalid bucket filter/); + }); + + test('throws on extra characters after closing bracket', () => { + expect(() => createBucketFilter('user[]extra')).toThrow(/Invalid bucket filter/); + }); + + test('throws on malformed JSON in parameters', () => { + expect(() => createBucketFilter('user[invalid]')).toThrow(/Invalid bucket filter/); + }); +}); + +describe('isValidBucketNameOrPrefix', () => { + test('valid bucket definition name', () => { + expect(isValidBucketNameOrPrefix('user')).toBe(true); + }); + + test('valid bucket name with empty params', () => { + expect(isValidBucketNameOrPrefix('global[]')).toBe(true); + }); + + test('valid bucket name with single param', () => { + expect(isValidBucketNameOrPrefix('user["user-123"]')).toBe(true); + }); + + test('valid bucket name with multiple params', () => { + expect(isValidBucketNameOrPrefix('workspace["ws-1","org-2"]')).toBe(true); + }); + + test('valid unicode bucket name', () => { + expect(isValidBucketNameOrPrefix('用户["测试"]')).toBe(true); + }); + + test('valid bucket with escaped quotes', () => { + expect(isValidBucketNameOrPrefix('user["name\\"with\\"quotes"]')).toBe(true); + }); + + test('empty string is invalid', () => { + expect(isValidBucketNameOrPrefix('')).toBe(false); + }); + + test('incomplete bracket is invalid', () => { + expect(isValidBucketNameOrPrefix('user[')).toBe(false); + }); + + test('incomplete parameter is invalid', () => { + expect(isValidBucketNameOrPrefix('user["')).toBe(false); + }); + + test('extra characters after bracket is invalid', () => { + expect(isValidBucketNameOrPrefix('user[]extra')).toBe(false); + }); + + test('malformed JSON is invalid', () => { + expect(isValidBucketNameOrPrefix('user[invalid]')).toBe(false); + }); +}); diff --git a/modules/module-postgres-storage/test/src/compactor-pagination.test.ts b/modules/module-postgres-storage/test/src/compactor-pagination.test.ts new file mode 100644 index 000000000..4904b7a07 --- /dev/null +++ b/modules/module-postgres-storage/test/src/compactor-pagination.test.ts @@ -0,0 +1,386 @@ +import { storage } from '@powersync/service-core'; +import { TEST_TABLE, test_utils } from '@powersync/service-core-tests'; +import { describe, expect, test } from 'vitest'; +import { POSTGRES_STORAGE_FACTORY } from './util.js'; + +/** Small batch limits to force pagination during compaction. */ +const SMALL_BATCH_OPTIONS = { + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 2, + minBucketChanges: 1 +} as const; + +describe('Postgres Compactor - Pagination', () => { + test('paginates correctly when data exceeds batch size', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT * FROM test + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + for (let i = 1; i <= 5; i++) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: `item-${i}` }, + afterReplicaId: test_utils.rid(`item-${i}`) + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: `item-${i}` }, + afterReplicaId: test_utils.rid(`item-${i}`) + }); + } + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const dataBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + expect(dataBefore.chunkData.data.length).toBe(10); + const checksumBefore = await bucketStorage.getChecksums(checkpoint, ['global[]']); + + await bucketStorage.compact(SMALL_BATCH_OPTIONS); + + const dataAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); + expect(checksumAfter.get('global[]')?.checksum, 'checksum should be preserved').toEqual(checksumBefore.get('global[]')?.checksum); + + test_utils.validateCompactedBucket(dataBefore.chunkData.data, dataAfter.chunkData.data); + }); + + test('paginates across multiple buckets in "all" mode', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + alpha: + data: + - SELECT * FROM test WHERE bucket = 'alpha' + beta: + data: + - SELECT * FROM test WHERE bucket = 'beta' + gamma: + data: + - SELECT * FROM test WHERE bucket = 'gamma' + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + for (const bucketName of ['alpha', 'beta', 'gamma']) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: `${bucketName}-1`, bucket: bucketName }, + afterReplicaId: test_utils.rid(`${bucketName}-1`) + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: `${bucketName}-1`, bucket: bucketName }, + afterReplicaId: test_utils.rid(`${bucketName}-1`) + }); + } + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const alphaBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['alpha[]', 0n]])) + ); + const betaBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['beta[]', 0n]])) + ); + const gammaBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['gamma[]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['alpha[]', 'beta[]', 'gamma[]']); + + await bucketStorage.compact(SMALL_BATCH_OPTIONS); + + const alphaAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['alpha[]', 0n]])) + ); + const betaAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['beta[]', 0n]])) + ); + const gammaAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['gamma[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['alpha[]', 'beta[]', 'gamma[]']); + expect(checksumsAfter.get('alpha[]')?.checksum, 'alpha[] checksum should be preserved').toEqual(checksumsBefore.get('alpha[]')?.checksum); + expect(checksumsAfter.get('beta[]')?.checksum, 'beta[] checksum should be preserved').toEqual(checksumsBefore.get('beta[]')?.checksum); + expect(checksumsAfter.get('gamma[]')?.checksum, 'gamma[] checksum should be preserved').toEqual(checksumsBefore.get('gamma[]')?.checksum); + + test_utils.validateCompactedBucket(alphaBefore.chunkData.data, alphaAfter.chunkData.data); + test_utils.validateCompactedBucket(betaBefore.chunkData.data, betaAfter.chunkData.data); + test_utils.validateCompactedBucket(gammaBefore.chunkData.data, gammaAfter.chunkData.data); + }); + + test('paginates in "prefix" mode across parameterized buckets', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + user: + parameters: SELECT id FROM users + data: + - SELECT * FROM test WHERE user_id = bucket.id + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + for (const userId of ['u1', 'u2', 'u3']) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: `t-${userId}`, user_id: userId }, + afterReplicaId: test_utils.rid(`t-${userId}`) + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: `t-${userId}`, user_id: userId }, + afterReplicaId: test_utils.rid(`t-${userId}`) + }); + } + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const user1Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const user2Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u2"]', 0n]])) + ); + const user3Before = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u3"]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['user["u1"]', 'user["u2"]', 'user["u3"]']); + + await bucketStorage.compact({ + ...SMALL_BATCH_OPTIONS, + compactBuckets: ['user'] + }); + + const user1After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u1"]', 0n]])) + ); + const user2After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u2"]', 0n]])) + ); + const user3After = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['user["u3"]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['user["u1"]', 'user["u2"]', 'user["u3"]']); + expect(checksumsAfter.get('user["u1"]')?.checksum, 'user["u1"] checksum should be preserved').toEqual(checksumsBefore.get('user["u1"]')?.checksum); + expect(checksumsAfter.get('user["u2"]')?.checksum, 'user["u2"] checksum should be preserved').toEqual(checksumsBefore.get('user["u2"]')?.checksum); + expect(checksumsAfter.get('user["u3"]')?.checksum, 'user["u3"] checksum should be preserved').toEqual(checksumsBefore.get('user["u3"]')?.checksum); + + test_utils.validateCompactedBucket(user1Before.chunkData.data, user1After.chunkData.data); + test_utils.validateCompactedBucket(user2Before.chunkData.data, user2After.chunkData.data); + test_utils.validateCompactedBucket(user3Before.chunkData.data, user3After.chunkData.data); + }); + + test('paginates in "exact" mode within a single bucket', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT * FROM test + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + for (let i = 1; i <= 4; i++) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: `item-${i}` }, + afterReplicaId: test_utils.rid(`item-${i}`) + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: `item-${i}` }, + afterReplicaId: test_utils.rid(`item-${i}`) + }); + } + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const dataBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + expect(dataBefore.chunkData.data.length).toBe(8); + const checksumBefore = await bucketStorage.getChecksums(checkpoint, ['global[]']); + + await bucketStorage.compact({ + ...SMALL_BATCH_OPTIONS, + compactBuckets: ['global[]'] + }); + + const dataAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); + expect(checksumAfter.get('global[]')?.checksum, 'checksum should be preserved').toEqual(checksumBefore.get('global[]')?.checksum); + + test_utils.validateCompactedBucket(dataBefore.chunkData.data, dataAfter.chunkData.data); + }); + + test('handles empty bucket gracefully', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + empty: + data: + - SELECT * FROM test WHERE 1 = 0 + global: + data: + - SELECT * FROM test + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 'g1' }, + afterReplicaId: test_utils.rid('g1') + }); + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const emptyBefore = await test_utils.fromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['empty[]', 0n]])) + ); + expect(emptyBefore.length).toBe(0); + + await bucketStorage.compact({ + ...SMALL_BATCH_OPTIONS, + compactBuckets: ['empty[]'] + }); + + const emptyAfter = await test_utils.fromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['empty[]', 0n]])) + ); + expect(emptyAfter.length).toBe(0); + + const globalAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + expect(globalAfter.chunkData.data.length).toBe(1); + expect(globalAfter.chunkData.data[0].op).toBe('PUT'); + }); + + test('compacts multiple exact-match buckets in a single call', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + first: + data: + - SELECT * FROM test WHERE bucket = 'first' + second: + data: + - SELECT * FROM test WHERE bucket = 'second' + third: + data: + - SELECT * FROM test WHERE bucket = 'third' + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + for (const bucketName of ['first', 'second', 'third']) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: `${bucketName}-1`, bucket: bucketName }, + afterReplicaId: test_utils.rid(`${bucketName}-1`) + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: `${bucketName}-1`, bucket: bucketName }, + afterReplicaId: test_utils.rid(`${bucketName}-1`) + }); + } + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + const firstBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['first[]', 0n]])) + ); + const secondBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['second[]', 0n]])) + ); + const thirdBefore = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['third[]', 0n]])) + ); + const checksumsBefore = await bucketStorage.getChecksums(checkpoint, ['first[]', 'second[]', 'third[]']); + + await bucketStorage.compact({ + ...SMALL_BATCH_OPTIONS, + compactBuckets: ['first[]', 'second[]'] + }); + + const firstAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['first[]', 0n]])) + ); + const secondAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['second[]', 0n]])) + ); + const thirdAfter = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['third[]', 0n]])) + ); + + bucketStorage.clearChecksumCache(); + const checksumsAfter = await bucketStorage.getChecksums(checkpoint, ['first[]', 'second[]', 'third[]']); + expect(checksumsAfter.get('first[]')?.checksum, 'first[] checksum should be preserved').toEqual(checksumsBefore.get('first[]')?.checksum); + expect(checksumsAfter.get('second[]')?.checksum, 'second[] checksum should be preserved').toEqual(checksumsBefore.get('second[]')?.checksum); + expect(checksumsAfter.get('third[]')?.checksum, 'third[] checksum should be preserved').toEqual(checksumsBefore.get('third[]')?.checksum); + + test_utils.validateCompactedBucket(firstBefore.chunkData.data, firstAfter.chunkData.data); + test_utils.validateCompactedBucket(secondBefore.chunkData.data, secondAfter.chunkData.data); + + expect(thirdAfter.chunkData.data, 'third[] data should remain unchanged').toEqual(thirdBefore.chunkData.data); + }); +});