diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index a2bb22f..db98a66 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -785,40 +785,31 @@ public IAsyncEnumerable OnChange(SQLWatchOptions? options = // Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the // connection is established - return OnChangeCore(powersyncTables, listener, signal.Token, options?.TriggerImmediately == true); + return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true); } private async IAsyncEnumerable OnChangeCore( HashSet watchedTables, IAsyncEnumerable listener, - [EnumeratorCancellation] CancellationToken signal, + CancellationTokenSource signal, bool triggerImmediately ) { - if (triggerImmediately == true) - { - yield return new WatchOnChangeEvent { ChangedTables = [] }; - } - - HashSet changedTables = new(); - await foreach (var e in listener) + try { - if (signal.IsCancellationRequested) yield break; - - changedTables.Clear(); - GetTablesFromNotification(e.TablesUpdated, changedTables); - changedTables.IntersectWith(watchedTables); - - if (changedTables.Count == 0) continue; - - var update = new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; - - // Convert from 'ps_data__' to '' - for (int i = 0; i < update.ChangedTables.Length; i++) + await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately)) { - update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]); + // Convert from 'ps_data__' to '' + for (int i = 0; i < update.ChangedTables.Length; i++) + { + update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]); + } + yield return update; } - yield return update; + } + finally + { + signal.Dispose(); } } @@ -870,7 +861,7 @@ private async IAsyncEnumerable WatchCore( var schemaChanged = new TaskCompletionSource(); // Listen for schema changes in the background - _ = Task.Run(async () => + var schemaListenerTask = Task.Run(async () => { await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token)) { @@ -885,60 +876,80 @@ private async IAsyncEnumerable WatchCore( var currentRestartCts = initialRestartCts; var currentListener = initialListener; - while (!signal.Token.IsCancellationRequested) + try { - // Resolve tables - HashSet powersyncTables; - if (options?.Tables != null) - { - powersyncTables = [.. options - .Tables - .SelectMany(table => [$"ps_data__{table}", $"ps_data_local__{table}"] - )]; - } - else - { - powersyncTables = await GetSourceTables(sql, parameters); - } - - var enumerator = OnRawTableChange( - powersyncTables, - currentListener, - currentRestartCts.Token, - isRestart || (options?.TriggerImmediately == true) - ).GetAsyncEnumerator(currentRestartCts.Token); - - // Continually wait for either OnChange or SchemaChanged to fire - while (true) + while (!signal.Token.IsCancellationRequested) { - var currentSchemaTask = schemaChanged.Task; - var onChangeTask = enumerator.MoveNextAsync().AsTask(); - var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask); - - if (completedTask == currentSchemaTask) + // Resolve tables + HashSet powersyncTables; + if (options?.Tables != null) + { + powersyncTables = [.. options + .Tables + .SelectMany(table => [$"ps_data__{table}", $"ps_data_local__{table}"] + )]; + } + else { - currentRestartCts.Cancel(); - isRestart = true; - // Let the current task complete/cancel gracefully - try { await onChangeTask; } - catch (OperationCanceledException) { } - - // Establish a new listener BEFORE resolving source tables in the next iteration, - // so that changes during the async GetSourceTables call are not missed. - currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); - currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token); - - break; + powersyncTables = await GetSourceTables(sql, parameters); } - var update = enumerator.Current; - if (update.ChangedTables != null) + var enumerator = OnRawTableChange( + powersyncTables, + currentListener, + currentRestartCts.Token, + isRestart || (options?.TriggerImmediately == true) + ).GetAsyncEnumerator(); + + // Continually wait for either OnChange or SchemaChanged to fire + while (true) { - if (signal.IsCancellationRequested) yield break; - yield return await GetAll(sql, parameters); + var currentSchemaTask = schemaChanged.Task; + var onChangeTask = enumerator.MoveNextAsync().AsTask(); + var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask); + + if (completedTask == currentSchemaTask) + { + var oldRestartCts = currentRestartCts; + oldRestartCts.Cancel(); + isRestart = true; + // Let the current task complete/cancel gracefully + try { await onChangeTask; } + catch (OperationCanceledException) { } + + // Establish a new listener BEFORE resolving source tables in the next iteration, + // so that changes during the async GetSourceTables call are not missed. + currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); + currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token); + oldRestartCts.Dispose(); + + break; + } + + // Await onChangeTask to propagate cancellation and detect end-of-enumeration + bool hasNext; + try { hasNext = await onChangeTask; } + catch (OperationCanceledException) { yield break; } + + if (!hasNext) break; + + var update = enumerator.Current; + if (update.ChangedTables != null) + { + yield return await GetAll(sql, parameters); + } } } } + finally + { + signal.Cancel(); + try { await schemaListenerTask; } + catch (OperationCanceledException) { } + + currentRestartCts.Dispose(); + signal.Dispose(); + } } private class ExplainedResult @@ -986,8 +997,6 @@ private async IAsyncEnumerable OnRawTableChange( HashSet changedTables = new(); await foreach (var e in listener) { - if (token.IsCancellationRequested) break; - // Extract the changed tables and intersect with the watched tables changedTables.Clear(); GetTablesFromNotification(e.TablesUpdated, changedTables); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs index c91daa0..851f20f 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs @@ -34,15 +34,12 @@ public async Task InitializeAsync() public async Task DisposeAsync() { testCts.Cancel(); + testCts.Dispose(); await db.DisconnectAndClear(); await db.Close(); - try { File.Delete(dbName); } - catch { } - - testCts.Dispose(); - testCts = new(); + DatabaseUtils.CleanDb(dbName); } private record IdResult(string id); @@ -50,40 +47,42 @@ private record CountResult(long count); private class AssetResult { - public string id { get; set; } - public string description { get; set; } + public string id { get; set; } = ""; + public string description { get; set; } = ""; public string? make { get; set; } } [Fact] - public async Task QueryWithoutParamsTest() + public async Task QueryWithoutParams() { - var name = "Test User"; - var age = 30; + var id = Guid.NewGuid().ToString(); + var description = "some desc"; + var make = "some make"; await db.Execute( "INSERT INTO assets(id, description, make) VALUES(?, ?, ?)", - [Guid.NewGuid().ToString(), name, age.ToString()] + [id, description, make] ); var result = await db.GetAll("SELECT id, description, make FROM assets"); Assert.Single(result); var row = result.First(); - Assert.Equal(name, row.description); - Assert.Equal(age.ToString(), row.make); + Assert.Equal(id, row.id); + Assert.Equal(description, row.description); + Assert.Equal(make, row.make); } [Fact] - public async Task QueryWithParamsTest() + public async Task QueryWithParams() { var id = Guid.NewGuid().ToString(); - var name = "Test User"; - var age = 30; + var description = "some desc"; + var make = "some make"; await db.Execute( "INSERT INTO assets(id, description, make) VALUES(?, ?, ?)", - [id, name, age.ToString()] + [id, description, make] ); var result = await db.GetAll("SELECT id, description, make FROM assets WHERE id = ?", [id]); @@ -91,8 +90,8 @@ await db.Execute( Assert.Single(result); var row = result.First(); Assert.Equal(id, row.id); - Assert.Equal(name, row.description); - Assert.Equal(age.ToString(), row.make); + Assert.Equal(description, row.description); + Assert.Equal(make, row.make); } [Fact] @@ -116,26 +115,26 @@ await db.Execute( } [Fact] - public async Task QueriesRunOnAnotherThread() + public async Task QueriesDoNotBlockCaller() { - int preCallThreadId = Environment.CurrentManagedThreadId; - await db.GetAll("select * from assets"); - int postCallThreadId = Environment.CurrentManagedThreadId; + var writeTask = db.WriteLock(async ctx => + { + // Simulate slow query + await Task.Delay(200); + }); - Assert.NotEqual(preCallThreadId, postCallThreadId); + Assert.False(writeTask.IsCompleted, "Write task with 200ms delay completed synchronously instead of yielding"); + await writeTask; } [Fact] - public async Task FailedInsertTest() + public async Task FailedInsert() { - var name = "Test User"; - var age = 30; - var exception = await Assert.ThrowsAsync(async () => { await db.Execute( "INSERT INTO assetsfail (id, description, make) VALUES(?, ?, ?)", - [Guid.NewGuid().ToString(), name, age.ToString()] + [Guid.NewGuid().ToString(), "some desc", "some make"] ); }); @@ -143,7 +142,7 @@ await db.Execute( } [Fact] - public async Task SimpleReadTransactionTest() + public async Task SimpleReadTransaction() { await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O3"]); @@ -156,7 +155,7 @@ public async Task SimpleReadTransactionTest() } [Fact] - public async Task ManualCommitTest() + public async Task ManualCommit() { await db.WriteTransaction(async tx => { @@ -171,7 +170,7 @@ await db.WriteTransaction(async tx => } [Fact] - public async Task AutoCommitTest() + public async Task AutoCommit() { await db.WriteTransaction(async tx => { @@ -185,7 +184,7 @@ await db.WriteTransaction(async tx => } [Fact] - public async Task ManualRollbackTest() + public async Task ManualRollback() { await db.WriteTransaction(async tx => { @@ -198,7 +197,7 @@ await db.WriteTransaction(async tx => } [Fact] - public async Task AutoRollbackTest() + public async Task AutoRollback() { bool exceptionThrown = false; try @@ -221,7 +220,7 @@ await db.WriteTransaction(async tx => } [Fact] - public async Task WriteTransactionWithReturnTest() + public async Task WriteTransactionWithReturn() { var result = await db.WriteTransaction(async tx => { @@ -234,7 +233,7 @@ public async Task WriteTransactionWithReturnTest() } [Fact] - public async Task WriteTransactionNestedQueryTest() + public async Task WriteTransactionNestedQuery() { await db.WriteTransaction(async tx => { @@ -249,7 +248,7 @@ await db.WriteTransaction(async tx => } [Fact] - public async Task ReadLockReadOnlyTest() + public async Task ReadLockReadOnly() { string id = Guid.NewGuid().ToString(); bool exceptionThrown = false; @@ -277,7 +276,7 @@ await db.ReadLock(async context => } [Fact] - public async Task ReadLocksQueueIfExceedNumberOfConnectionsTest() + public async Task ReadLocksQueueIfExceedNumberOfConnections() { string id = Guid.NewGuid().ToString(); @@ -302,7 +301,7 @@ await db.Execute( } [Fact(Timeout = 2000)] - public async Task ReadWhileWriteIsRunningTest() + public async Task ReadWhileWriteIsRunning() { var sem = new TaskCompletionSource(); @@ -327,7 +326,7 @@ public async Task ReadWhileWriteIsRunningTest() } [Fact] - public async Task BatchExecuteTest() + public async Task BatchExecute() { var id1 = Guid.NewGuid().ToString(); var description1 = "Asset 1"; @@ -357,7 +356,7 @@ public async Task BatchExecuteTest() } [Fact(Timeout = 2000)] - public async Task QueueSimultaneousExecutionsTest() + public async Task QueueSimultaneousExecutions() { var order = new List(); var operationCount = 5; @@ -376,7 +375,7 @@ await db.WriteLock(async context => } [Fact(Timeout = 2000)] - public async Task CallUpdateHookOnChangesTest() + public async Task CallUpdateHookOnChanges() { var result = new TaskCompletionSource(); @@ -401,7 +400,7 @@ public async Task CallUpdateHookOnChangesTest() } [Fact(Timeout = 2000)] - public async Task ReflectWriteTransactionUpdatesOnReadConnectionsTest() + public async Task ReflectWriteTransactionUpdatesOnReadConnections() { var watched = new TaskCompletionSource(); @@ -428,7 +427,7 @@ await db.WriteTransaction(async tx => } [Fact(Timeout = 5000)] - public async Task ReflectWriteLockUpdatesOnReadConnectionsTest() + public async Task ReflectWriteLockUpdatesOnReadConnections() { var numberOfAssets = 10_000; @@ -464,7 +463,7 @@ await db.WriteLock(async tx => } [Fact(Timeout = 5000)] - public async Task Insert1000Records_CompleteWithinTimeLimitTest() + public async Task Insert1000Records_CompleteWithinTimeLimit() { var random = new Random(); var stopwatch = Stopwatch.StartNew(); @@ -487,7 +486,7 @@ await db.Execute( } [Fact(Timeout = 5000)] - public async Task TestConcurrentReadsTest() + public async Task TestConcurrentReads() { await db.Execute("INSERT INTO assets(id) VALUES(?)", ["O6-conccurent-1"]); var sem = new TaskCompletionSource(); @@ -517,7 +516,7 @@ await db.WriteTransaction(async tx => } [Fact(Timeout = 5000)] - public async Task GetUploadQueueStatsTest() + public async Task GetUploadQueueStats() { for (var i = 0; i < 100; i++) { @@ -530,7 +529,7 @@ public async Task GetUploadQueueStatsTest() } [Fact] - public async Task QueryDynamicTest() + public async Task QueryDynamic() { string id = Guid.NewGuid().ToString(); string description = "new description"; @@ -547,7 +546,7 @@ await db.Execute( } [Fact(Timeout = 2500)] - public async Task WatchDynamicTest() + public async Task Watch_Dynamic() { string id = Guid.NewGuid().ToString(); string description = "dynamic description"; @@ -584,7 +583,7 @@ await db.Execute( } [Fact(Timeout = 2000)] - public async Task WatchCancelledTest() + public async Task Watch_Cancelled() { int callCount = 0; using var sem = new SemaphoreSlim(0); @@ -601,20 +600,14 @@ public async Task WatchCancelledTest() Assert.True(await sem.WaitAsync(100)); Assert.Equal(1, callCount); - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); + await TestUtils.InsertRandomAsset(db); Assert.True(await sem.WaitAsync(100)); Assert.Equal(2, callCount); testCts.Cancel(); - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); + await TestUtils.InsertRandomAsset(db); // This is failing Assert.False(await sem.WaitAsync(100)); @@ -622,7 +615,7 @@ await db.Execute( } [Fact(Timeout = 3000)] - public async Task WatchMultipleCancelledTest() + public async Task Watch_MultipleCancelled() { int callCount = 0; @@ -647,20 +640,14 @@ void RunQuery(CancellationTokenSource cts, SemaphoreSlim sem) RunQuery(ctsAlwaysRunning, semAlwaysRunning); RunQuery(ctsCancelled, semCancelled); - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); + await TestUtils.InsertRandomAsset(db); await Task.WhenAll(semAlwaysRunning.WaitAsync(), semCancelled.WaitAsync()); Assert.Equal(2, callCount); // Close one query ctsCancelled.Cancel(); - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); + await TestUtils.InsertRandomAsset(db); // Ensure nothing received from cancelled result Assert.False(await semCancelled.WaitAsync(100)); @@ -671,10 +658,7 @@ await db.Execute( // Sanity check ctsAlwaysRunning.Cancel(); - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); + await TestUtils.InsertRandomAsset(db); Assert.False(await semAlwaysRunning.WaitAsync(100)); Assert.False(await semCancelled.WaitAsync(100)); @@ -682,7 +666,7 @@ await db.Execute( } [Fact(Timeout = 3000)] - public async Task WatchSchemaResetTest() + public async Task Watch_SchemaReset() { var dbId = Guid.NewGuid().ToString(); var localDbName = $"PowerSyncWatchReset_{dbId}.db"; @@ -701,7 +685,7 @@ public async Task WatchSchemaResetTest() long lastCount = 0; const string QUERY = "SELECT COUNT(*) AS count FROM assets"; - var listener = db.Watch(QUERY, null, new() { Signal = testCts.Token }); + var listener = db.Watch(QUERY, null, new() { Signal = testCts.Token, TriggerImmediately = true }); _ = Task.Run(async () => { await foreach (var result in listener) @@ -709,8 +693,11 @@ public async Task WatchSchemaResetTest() lastCount = result[0].count; sem.Release(); } - }, testCts.Token); - Assert.False(await sem.WaitAsync(200)); + + // Called on cancellation + sem.Release(); + }); + Assert.True(await sem.WaitAsync(100)); var resolved = await db.GetSourceTables(QUERY, null); Assert.Single(resolved); @@ -718,10 +705,7 @@ public async Task WatchSchemaResetTest() for (int i = 0; i < 3; i++) { - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); + await TestUtils.InsertRandomAsset(db); Assert.True(await sem.WaitAsync(100)); Assert.Equal(i + 1, lastCount); } @@ -742,7 +726,7 @@ await db.Execute( // Sanity check testCts.Cancel(); - await Task.Delay(100); + Assert.True(await sem.WaitAsync(500)); await db.Execute("delete from assets"); Assert.False(await sem.WaitAsync(100)); @@ -774,7 +758,7 @@ public async Task Attributes_ColumnAliasing() var createdAt = DateTimeOffset.Now; await db.Execute( - "INSERT INTO todos(id, description, completed, created_at, list_id) VALUES(?, ?, ?, ?, uuid())", + "INSERT INTO todos (id, description, completed, created_at) VALUES (?, ?, ?, ?)", [id, description, completed, createdAt] ); @@ -806,4 +790,79 @@ public async Task IndexesCreatedOnTable() result = await db.GetAll("PRAGMA index_info('ps_data__assets__makemodel')"); Assert.Equal(2, result.Length); // make, model } + + [Fact] + public async Task Watch_TriggerImmediately_True() + { + // Insert some data + await TestUtils.InsertRandomAssets(db, 3); + + var listener = db.Watch( + "SELECT COUNT(*) AS count FROM assets", + null, + new() { Signal = testCts.Token, TriggerImmediately = true }); + var enumerator = listener.GetAsyncEnumerator(); + + var moveNext = enumerator.MoveNextAsync().AsTask(); + var timeout = Task.Delay(500); + Assert.NotEqual(timeout, await Task.WhenAny(moveNext, timeout)); + Assert.True(await moveNext); + + var current = enumerator.Current; + Assert.Single(current); + Assert.Equal(3, current[0].count); + } + + [Fact] + public async Task Watch_TriggerImmediately_False() + { + var listener = db.Watch( + "SELECT COUNT(*) AS count FROM assets", + null, + new() { TriggerImmediately = false }); + + var enumerator = listener.GetAsyncEnumerator(); + + var moveNext = enumerator.MoveNextAsync().AsTask(); + var timeout = Task.Delay(200); + Assert.Equal(timeout, await Task.WhenAny(moveNext, timeout)); + + // Trigger the watch to run + await TestUtils.InsertRandomAssets(db, 3); + + timeout = Task.Delay(500); + Assert.NotEqual(timeout, await Task.WhenAny(moveNext, timeout)); + Assert.True(await moveNext); + + var current = enumerator.Current; + Assert.Single(current); + Assert.Equal(3, current[0].count); + } + + [Fact(Timeout = 2000)] + public async Task Watch_CancelsOnTokenCancellation() + { + var cts = CancellationTokenSource.CreateLinkedTokenSource(testCts.Token); + var tcs = new TaskCompletionSource(); + var sem = new SemaphoreSlim(0); + var listener = db.Watch( + "SELECT COUNT(*) AS count FROM assets", + null, + new() { Signal = cts.Token, TriggerImmediately = false }); + + // Sem == received result + // TCS == received cancellation + _ = Task.Run(async () => + { + await foreach (var _ in listener) { sem.Release(); } + tcs.TrySetResult(true); + }); + + await TestUtils.InsertRandomAssets(db, 3); + Assert.True(await sem.WaitAsync(200)); + + cts.Cancel(); + + Assert.True(await tcs.Task); + } } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs index 8a3e68b..58525fb 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs @@ -44,26 +44,18 @@ private async Task ResetDB(PowerSyncDatabase db) DatabaseUtils.CleanDb(db.Database.Name); } - [Fact] - public async Task IncludeMetadataTest() + private async Task WithCustomAssetOptions(TableOptions options, Func test) { - var localDbName = $"IncludeMetadataTest-{Guid.NewGuid():N}.db"; + var localDbName = $"crud-custom-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { Database = new SQLOpenOptions { DbFilename = localDbName }, - Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions - { - TrackMetadata = true - }), + Schema = TestSchema.GetSchemaWithCustomAssetOptions(options), }); try { await ResetDB(db); - - await db.Execute("INSERT INTO assets (id, description, _metadata) VALUES(uuid(), 'xxxx', 'so meta');"); - - var batch = await db.GetNextCrudTransaction(); - Assert.Equal("so meta", batch?.Crud[0].Metadata); + await test(db); } finally { @@ -73,21 +65,21 @@ public async Task IncludeMetadataTest() } [Fact] - public async Task IncludeOldValuesTest() - { - var localDbName = $"IncludeOldValuesTest-{Guid.NewGuid():N}.db"; - var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + public Task IncludeMetadataTest() => WithCustomAssetOptions( + new TableOptions { TrackMetadata = true }, + async db => { - Database = new SQLOpenOptions { DbFilename = localDbName }, - Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions - { - TrackPreviousValues = new TrackPreviousOptions() - }), + await db.Execute("INSERT INTO assets (id, description, _metadata) VALUES(uuid(), 'xxxx', 'so meta');"); + + var batch = await db.GetNextCrudTransaction(); + Assert.Equal("so meta", batch?.Crud[0].Metadata); }); - try - { - await ResetDB(db); + [Fact] + public Task IncludeOldValuesTest() => WithCustomAssetOptions( + new TableOptions { TrackPreviousValues = new TrackPreviousOptions() }, + async db => + { await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry"]); await db.Execute("DELETE FROM ps_crud;"); await db.Execute("UPDATE assets SET description = ?", ["new name"]); @@ -95,33 +87,19 @@ public async Task IncludeOldValuesTest() var batch = await db.GetNextCrudTransaction(); Assert.True(batch?.Crud[0].PreviousValues?.ContainsKey("description")); Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); - } - finally - { - // await db.Close(); - // DatabaseUtils.CleanDb(localDbName); - } - } + }); [Fact] - public async Task IncludeOldValuesWithColumnFilterTest() - { - var localDbName = $"IncludeOldValuesWithColumnFilterTest-{Guid.NewGuid():N}.db"; - var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + public Task IncludeOldValuesWithColumnFilterTest() => WithCustomAssetOptions( + new TableOptions { - Database = new SQLOpenOptions { DbFilename = localDbName }, - Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + TrackPreviousValues = new TrackPreviousOptions { - TrackPreviousValues = new TrackPreviousOptions - { - Columns = new List { "description" } - } - }), - }); - try + Columns = new List { "description" } + } + }, + async db => { - await ResetDB(db); - await db.Execute("INSERT INTO assets (id, description, make) VALUES(?, ?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry", "make1"]); await db.Execute("DELETE FROM ps_crud;"); await db.Execute("UPDATE assets SET description = ?, make = ?", ["new name", "make2"]); @@ -130,34 +108,20 @@ public async Task IncludeOldValuesWithColumnFilterTest() Assert.NotNull(batch?.Crud[0].PreviousValues); Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); Assert.False(batch?.Crud[0].PreviousValues!.ContainsKey("make")); - } - finally - { - await db.Close(); - DatabaseUtils.CleanDb(localDbName); - } - } + }); [Fact] - public async Task IncludeOldValuesWhenChangedTest() - { - var localDbName = $"IncludeOldValuesWhenChangedTest-{Guid.NewGuid():N}.db"; - var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + public Task IncludeOldValuesWhenChangedTest() => WithCustomAssetOptions( + new TableOptions { - Database = new SQLOpenOptions { DbFilename = localDbName }, - Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions + TrackPreviousValues = new TrackPreviousOptions { - TrackPreviousValues = new TrackPreviousOptions - { - OnlyWhenChanged = true - } - }), - }); - try + OnlyWhenChanged = true + } + }, + async db => { - await ResetDB(db); - await db.Execute("INSERT INTO assets (id, description, make) VALUES(uuid(), ?, ?);", ["name", "make1"]); await db.Execute("DELETE FROM ps_crud;"); await db.Execute("UPDATE assets SET description = ?", ["new name"]); @@ -167,42 +131,20 @@ public async Task IncludeOldValuesWhenChangedTest() Assert.NotNull(batch.Crud[0].PreviousValues); Assert.Equal("name", batch.Crud[0].PreviousValues!["description"]); Assert.False(batch.Crud[0].PreviousValues!.ContainsKey("make")); - } - finally - { - await db.Close(); - DatabaseUtils.CleanDb(localDbName); - } - } + }); [Fact] - public async Task IgnoreEmptyUpdateTest() - { - var localDbName = $"IgnoreEmptyUpdateTest-{Guid.NewGuid():N}.db"; - var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions - { - Database = new SQLOpenOptions { DbFilename = localDbName }, - Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions - { - IgnoreEmptyUpdates = true - }), - }); - try + public Task IgnoreEmptyUpdateTest() => WithCustomAssetOptions( + new TableOptions { IgnoreEmptyUpdates = true }, + async db => { - await ResetDB(db); await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", [testId, "name"]); await db.Execute("DELETE FROM ps_crud;"); await db.Execute("UPDATE assets SET description = ?", ["name"]); var batch = await db.GetNextCrudTransaction(); Assert.Null(batch); - } - finally - { - await db.Close(); - DatabaseUtils.CleanDb(localDbName); - } - } + }); [Fact] public async Task Insert_RecordCrudEntryTest() diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs index aedfa4e..e646b07 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs @@ -102,7 +102,8 @@ public async Task SubscribesWithStreams() Assert.Null(statusForStream!.Subscription.LastSyncedAt); Assert.True(statusForStream!.Subscription.HasExplicitSubscription); } - await Task.Delay(100); + // Ensure the previous status update is fully settled before listening for the next one + await Task.Yield(); statusTask = MockSyncService.NextStatus(db); syncService.PushLine( @@ -152,15 +153,16 @@ public async Task ChangesSubscriptionsDynamically() ) ); - await Task.Delay(100); + await statusTask; var subscription = await db.SyncStream("a").Subscribe(); + // Wait for subscription request to register await TestUtils.WaitForAsync(() => syncService.Requests.Count > 1); Assert.Single(syncService.Requests[1]?.Streams?.Subscriptions!); // Given that the subscription has a TTL, dropping the handle should not re-subscribe. subscription.Unsubscribe(); - await Task.Delay(100); + await TestUtils.WaitForAsync(() => syncService.Requests.Count == 2); Assert.Equal(2, syncService.Requests.Count); } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs index a2de5d7..a10f3b4 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs @@ -14,10 +14,11 @@ public class SyncTests : IAsyncLifetime MockSyncService syncService = null!; PowerSyncDatabase db = null!; - public async Task InitializeAsync() + public Task InitializeAsync() { syncService = new MockSyncService(); db = syncService.CreateDatabase(); + return Task.CompletedTask; } public async Task DisposeAsync() @@ -74,7 +75,12 @@ public async Task SyncDeleteOperationTest() syncService.PushLine(line); } - await Task.Delay(500); // Wait for sync to process + // Wait for sync to process + await TestUtils.WaitForAsync(async () => + { + var rows = await db.GetAll("SELECT * FROM lists"); + return rows.Length == 0; + }); var result = await db.GetAll("SELECT * FROM lists"); Assert.Empty(result); @@ -105,7 +111,8 @@ public async Task SyncLocalCreateOperationTest() await db.Execute("insert into lists (id, name, owner_id, created_at) values (uuid(), 'New User', ?, datetime())", ["78bb787c-ff0b-41b2-a297-6a7701648f4a"]); - await Task.Delay(500); // Wait for local change to be registered + // Wait for local change to be registered + await TestUtils.WaitForAsync(() => db.CurrentStatus.DataFlowStatus.Uploading == false); Assert.Null(db.CurrentStatus.DataFlowStatus.UploadError); foreach (var line in syncAfterLocalCreate) @@ -113,7 +120,8 @@ public async Task SyncLocalCreateOperationTest() syncService.PushLine(line); } - await Task.Delay(500); // Wait for sync to process + // Wait for sync to process + await TestUtils.WaitForAsync(() => db.CurrentStatus.DataFlowStatus.Downloading == false); Assert.Null(db.CurrentStatus.DataFlowStatus.DownloadError); } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/DB/SchemaTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/DB/SchemaTests.cs index f311c4b..e5d8ebf 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/DB/SchemaTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/DB/SchemaTests.cs @@ -29,20 +29,20 @@ private void TestParser(Type type, CompiledTable expected) ] class Asset { - public string id { get; set; } + public string id { get; set; } = ""; public DateTime created_at { get; set; } - public string make { get; set; } + public string make { get; set; } = ""; - public string model { get; set; } + public string model { get; set; } = ""; public int quantity { get; set; } public string? description { get; set; } [Ignored] - public string non_table_field { get; set; } + public string non_table_field { get; set; } = ""; } [Fact] @@ -87,12 +87,12 @@ public void AttributeParser_Assets_Test() ] class Product { - public string id { get; set; } + public string id { get; set; } = ""; [Column(ColumnType = ColumnType.Real)] public DateTime created_at { get; set; } - public string description { get; set; } + public string description { get; set; } = ""; [Column(TrackPrevious = true)] public int quantity { get; set; } @@ -100,7 +100,7 @@ class Product [Column(TrackPrevious = true)] public decimal ppu { get; set; } - public string seller_id { get; set; } + public string seller_id { get; set; } = ""; } [Fact] @@ -158,10 +158,10 @@ enum LogLevel class Log { [Column("id")] - public string LogId { get; set; } + public string LogId { get; set; } = ""; [Column("description")] - public string Description { get; set; } + public string Description { get; set; } = ""; [Column("timestamp")] public DateTimeOffset Timestamp { get; set; } @@ -199,13 +199,14 @@ public void AttributeParser_Logs_Test() TestParser(typeof(Log), expected); } - class Invalid1 { public string id { get; set; } } + class Invalid1 { public string id { get; set; } = ""; } [Fact] public async void AttributeParser_InvalidSchema_1() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid1)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("must be marked with TableAttribute", ex.Message); } @@ -215,9 +216,10 @@ class Invalid2 { } [Fact] public async void AttributeParser_InvalidSchema_2() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid2)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("'id' property is required", ex.Message); } @@ -227,9 +229,10 @@ class Invalid3 { public int id { get; set; } } [Fact] public async void AttributeParser_InvalidSchema_3() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid3)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("must be of type string", ex.Message); } @@ -238,14 +241,15 @@ public async void AttributeParser_InvalidSchema_3() class Invalid4 { [Column(ColumnType = ColumnType.Real)] - public string id { get; set; } + public string id { get; set; } = ""; } [Fact] public async void AttributeParser_InvalidSchema_4() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid4)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("must have ColumnType set to ColumnType.Text or ColumnType.Inferred", ex.Message); } @@ -253,15 +257,16 @@ public async void AttributeParser_InvalidSchema_4() [Table("invalid")] class Invalid5 { - public string id { get; set; } - public Invalid1 invalid_type { get; set; } + public string id { get; set; } = ""; + public Invalid1 invalid_type { get; set; } = default!; } [Fact] public async void AttributeParser_InvalidSchema_5() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid5)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("Unable to automatically infer ColumnType", ex.Message); } @@ -269,14 +274,15 @@ public async void AttributeParser_InvalidSchema_5() [Table("invalid", TrackPreviousValues = TrackPrevious.Columns | TrackPrevious.Table)] class Invalid6 { - public string id { get; set; } + public string id { get; set; } = ""; } [Fact] public async void AttributeParser_InvalidSchema_6() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid6)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("Cannot specify both TrackPrevious.Columns and TrackPrevious.Table", ex.Message); } @@ -284,20 +290,21 @@ public async void AttributeParser_InvalidSchema_6() [Table("invalid", TrackPreviousValues = TrackPrevious.OnlyWhenChanged)] class Invalid7 { - public string id { get; set; } + public string id { get; set; } = ""; } [Fact] public async void AttributeParser_InvalidSchema_7() { - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(() => { new AttributeParser(typeof(Invalid7)).ParseTable(); + return Task.CompletedTask; }); Assert.Contains("Cannot specify TrackPrevious.OnlyWhenChanged without also specifying", ex.Message); } [Fact] - public async void AttributeParser_TypeMap_CustomRegistered() + public void AttributeParser_TypeMap_CustomRegistered() { // Log has Column aliases new AttributeParser(typeof(Log)).RegisterDapperTypeMap(); @@ -306,7 +313,7 @@ public async void AttributeParser_TypeMap_CustomRegistered() } [Fact] - public async void AttributeParser_TypeMap_DefaultRegistered() + public void AttributeParser_TypeMap_DefaultRegistered() { // Asset has no Column aliases new AttributeParser(typeof(Asset)).RegisterDapperTypeMap(); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs index 6faacc8..657f40b 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs @@ -110,7 +110,7 @@ public async Task EventStream_ShouldReceiveTwoMessages_Sync() } [Fact] - public async Task EventManager_RegistersStreamsCorrectly() + public void EventManager_RegistersStreamsCorrectly() { var manager = new EventManager(); var stream1 = new EventStream(); @@ -126,13 +126,13 @@ public async Task EventManager_RegistersStreamsCorrectly() Assert.Equal(stream1, obtainedStream1); Assert.Equal(stream2, obtainedStream2); - Assert.Equal(null, obtainedStream3); + Assert.Null(obtainedStream3); manager.Close(); } [Fact] - public async Task EventManager_CloseRemovesAndClosesStreams() + public void EventManager_CloseRemovesAndClosesStreams() { var manager = new EventManager(); var stream1 = new EventStream(); @@ -148,9 +148,9 @@ public async Task EventManager_CloseRemovesAndClosesStreams() Assert.False(manager.TryGetStream(out var obtainedStream2)); Assert.False(manager.TryGetStream(out var obtainedStream3)); - Assert.Equal(null, obtainedStream1); - Assert.Equal(null, obtainedStream2); - Assert.Equal(null, obtainedStream3); + Assert.Null(obtainedStream1); + Assert.Null(obtainedStream2); + Assert.Null(obtainedStream3); Assert.True(stream1.Closed); Assert.True(stream2.Closed); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Models/List.cs b/Tests/PowerSync/PowerSync.Common.Tests/Models/List.cs index 83e041e..c2452d8 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Models/List.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Models/List.cs @@ -7,14 +7,14 @@ namespace PowerSync.Common.Tests.Models; public class List { [Column("id")] - public string ListId { get; set; } + public string ListId { get; set; } = ""; [Column("created_at")] public DateTime CreatedAt { get; set; } [Column("name")] - public string Name { get; set; } + public string Name { get; set; } = ""; [Column("owner_id")] - public string OwnerId { get; set; } + public string OwnerId { get; set; } = ""; } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Models/Todo.cs b/Tests/PowerSync/PowerSync.Common.Tests/Models/Todo.cs index a94b70c..36e2962 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Models/Todo.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Models/Todo.cs @@ -10,10 +10,10 @@ namespace PowerSync.Common.Tests.Models; public class Todo { [Column("id")] - public string TodoId { get; set; } + public string TodoId { get; set; } = ""; [Column("list_id")] - public string ListId { get; set; } + public string ListId { get; set; } = ""; [Column("created_at")] public DateTime CreatedAt { get; set; } @@ -22,10 +22,10 @@ public class Todo public DateTime? CompletedAt { get; set; } [Column("description")] - public string Description { get; set; } + public string Description { get; set; } = ""; [Column("created_by")] - public string CreatedBy { get; set; } + public string CreatedBy { get; set; } = ""; [Column("completed_by")] public string? CompletedBy { get; set; } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs index 5482556..2169d1d 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs @@ -154,7 +154,7 @@ public MockRemote( this.connectedListeners = connectedListeners; } - public override async Task PostStreamRaw(SyncStreamOptions options) + public override Task PostStreamRaw(SyncStreamOptions options) { connectedListeners.Add(options.Data); @@ -181,27 +181,27 @@ public override async Task PostStreamRaw(SyncStreamOptions options) } }); - return pipe.Reader.AsStream(); + return Task.FromResult(pipe.Reader.AsStream()); } - public override async Task Get(string path, Dictionary? headers = null) + public override Task Get(string path, Dictionary? headers = null) { var response = new StreamingSyncImplementation.ApiResponse( new StreamingSyncImplementation.ResponseData("1") ); - return (T)(object)response; + return Task.FromResult((T)(object)response); } } public class TestConnector : IPowerSyncBackendConnector { - public async Task FetchCredentials() + public Task FetchCredentials() { - return new PowerSyncCredentials( + return Task.FromResult(new PowerSyncCredentials( endpoint: "https://powersync.example.org", token: "test" - ); + )); } public async Task UploadData(IPowerSyncDatabase database) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Utils/TestUtils.cs b/Tests/PowerSync/PowerSync.Common.Tests/Utils/TestUtils.cs index f1fc960..3b3f77f 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Utils/TestUtils.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Utils/TestUtils.cs @@ -1,4 +1,5 @@ using System.Runtime.CompilerServices; +using PowerSync.Common.Client; namespace PowerSync.Common.Tests.Utils; @@ -31,4 +32,43 @@ public static async Task WaitForAsync(Func condition, TimeSpan? timeout = } throw new TimeoutException("Condition not met within timeout"); } + + public static async Task WaitForAsync(Func> condition, TimeSpan? timeout = null) + { + timeout ??= TimeSpan.FromSeconds(5); + var start = DateTime.UtcNow; + while (DateTime.UtcNow - start < timeout) + { + if (await condition()) + return; + await Task.Delay(50); + } + throw new TimeoutException("Condition not met within timeout"); + } + + public static async Task InsertRandomAsset(PowerSyncDatabase db) + { + var id = Guid.NewGuid().ToString(); + await db.Execute( + "insert into assets(id, description, make) values (?, ?, ?)", + [id, "some desc", "some make"] + ); + return id; + } + + public static async Task InsertRandomAssets(PowerSyncDatabase db, int assetCount) + { + var ids = Enumerable + .Range(0, assetCount) + .Select(_ => Guid.NewGuid().ToString()) + .ToArray(); + var parameters = ids + .Select(id => [id, "some desc", "some make"]) + .ToArray(); + await db.ExecuteBatch( + "insert into assets(id, description, make) values (?, ?, ?)", + parameters + ); + return ids; + } }