From f2e5ffb35e497ffac8e9391fc673bb5d327bfb85 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 3 Mar 2026 13:13:17 +0200 Subject: [PATCH 1/8] Add EventManager --- .../Client/PowerSyncDatabase.cs | 126 ++++++++++++++---- .../PowerSync.Common/Utils/IEventManager.cs | 43 ++++++ .../Utils/Sync/MockSyncService.cs | 9 +- 3 files changed, 143 insertions(+), 35 deletions(-) create mode 100644 PowerSync/PowerSync.Common/Utils/IEventManager.cs diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 52e9eda..1fb5934 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -53,17 +53,87 @@ public class PowerSyncDatabaseOptions() : BasePowerSyncDatabaseOptions() public Func? RemoteFactory { get; set; } } -public class PowerSyncDBEvent : StreamingSyncImplementationEvent +public class PowerSyncDBEvents : EventManager { - public bool? Initialized { get; set; } - public Schema? SchemaChanged { get; set; } + public interface IPowerSyncDBEvent; - public bool? Closing { get; set; } + public class InitializedEvent : IPowerSyncDBEvent; + public class ClosingEvent : IPowerSyncDBEvent; + public class ClosedEvent : IPowerSyncDBEvent; + public class SchemaChangedEvent(Schema schema) : IPowerSyncDBEvent + { + public Schema Schema { get; set; } = schema; + } + public class StatusChangedEvent(SyncStatus status) : IPowerSyncDBEvent + { + public SyncStatus Status { get; set; } = status; + } + public class StatusUpdatedEvent(SyncStatusOptions status) : IPowerSyncDBEvent + { + public SyncStatusOptions Status { get; set; } = status; + } + + public EventStream OnInitialized { get; } = new(); + public EventStream OnClosing { get; } = new(); + public EventStream OnClosed { get; } = new(); + public EventStream OnSchemaChanged { get; } = new(); + public EventStream OnStatusChanged { get; } = new(); + public EventStream OnStatusUpdated { get; } = new(); + + public override bool TryGetStream(out EventStream stream) + { + if (typeof(T) == typeof(PowerSyncDBEvents.InitializedEvent)) + { + stream = (EventStream)(object)OnInitialized; + return true; + } + + if (typeof(T) == typeof(PowerSyncDBEvents.ClosingEvent)) + { + stream = (EventStream)(object)OnClosing; + return true; + } + + if (typeof(T) == typeof(PowerSyncDBEvents.ClosedEvent)) + { + stream = (EventStream)(object)OnClosed; + return true; + } + + if (typeof(T) == typeof(PowerSyncDBEvents.SchemaChangedEvent)) + { + stream = (EventStream)(object)OnSchemaChanged; + return true; + } + + if (typeof(T) == typeof(PowerSyncDBEvents.StatusChangedEvent)) + { + stream = (EventStream)(object)OnStatusChanged; + return true; + } + + if (typeof(T) == typeof(PowerSyncDBEvents.StatusUpdatedEvent)) + { + stream = (EventStream)(object)OnStatusUpdated; + return true; + } - public bool? Closed { get; set; } + stream = null!; + return false; + } + + public override void Close() + { + OnInitialized.Close(); + OnClosing.Close(); + OnClosed.Close(); + OnSchemaChanged.Close(); + OnStatusChanged.Close(); + OnStatusUpdated.Close(); + } } -public interface IPowerSyncDatabase : IEventStream +public interface IPowerSyncDatabase { public Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnectionOptions? options = null); public ISyncStream SyncStream(string name, Dictionary? parameters = null); @@ -96,10 +166,9 @@ public interface IPowerSyncDatabase : IEventStream Task WriteTransaction(Func fn, DBLockOptions? options = null); Task WriteTransaction(Func> fn, DBLockOptions? options = null); - } -public class PowerSyncDatabase : EventStream, IPowerSyncDatabase +public class PowerSyncDatabase : IPowerSyncDatabase { public IDBAdapter Database { get; protected set; } private CompiledSchema schema; @@ -107,9 +176,11 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30; private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled); - public new bool Closed { get; protected set; } + public bool Closed { get; protected set; } public bool Ready { get; protected set; } + public PowerSyncDBEvents Events { get; protected set; } = new(); + protected Task IsReadyTask; protected ConnectionManager ConnectionManager; @@ -138,7 +209,6 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba public bool Connected => CurrentStatus.Connected; public bool Connecting => CurrentStatus.Connecting; - public PowerSyncConnectionOptions? ConnectionOptions => ConnectionManager.ConnectionOptions; public PowerSyncDatabase(PowerSyncDatabaseOptions options) @@ -216,7 +286,7 @@ public PowerSyncDatabase(PowerSyncDatabaseOptions options) { HasSynced = CurrentStatus?.HasSynced == true || update.StatusChanged.LastSyncedAt != null, }); - Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } } }); @@ -285,7 +355,7 @@ public async Task WaitForStatus(Func predicate, CancellationTo ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token) : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, cancellationToken.Value); - var statusListener = ListenAsync(cts.Token); + var statusListener = Events.OnStatusChanged.ListenAsync(cts.Token); if (predicate(CurrentStatus)) { @@ -301,7 +371,7 @@ public async Task WaitForStatus(Func predicate, CancellationTo { await foreach (var update in statusListener) { - if ((update.StatusChanged != null) && predicate(update.StatusChanged)) + if (predicate(update.Status)) { tcs.TrySetResult(true); cts.Cancel(); @@ -322,7 +392,7 @@ protected async Task Initialize(PowerSyncDatabaseOptions options) await ResolveOfflineSyncStatus(); await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE"); Ready = true; - Emit(new PowerSyncDBEvent { Initialized = true }); + Events.Emit(new PowerSyncDBEvents.InitializedEvent()); } private record VersionResult(string version); @@ -366,7 +436,7 @@ protected async Task ResolveOfflineSyncStatus() if (!updatedStatus.IsEqual(CurrentStatus)) { CurrentStatus = updatedStatus; - Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } } @@ -394,7 +464,7 @@ public async Task UpdateSchema(Schema schema) this.schema = compiledSchema; await Database.Execute("SELECT powersync_replace_schema(?)", [compiledSchema.ToJSON()]); await Database.RefreshSchema(); - Emit(new PowerSyncDBEvent { SchemaChanged = schema }); + Events.Emit(new PowerSyncDBEvents.SchemaChangedEvent(schema)); } /// @@ -458,7 +528,7 @@ await Database.WriteTransaction(async tx => // The data has been deleted - reset the sync status CurrentStatus = new SyncStatus(new SyncStatusOptions()); - Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } /// @@ -479,18 +549,16 @@ public ISyncStream SyncStream(string name, Dictionary? parameter /// Once close is called, this connection cannot be used again - a new one /// must be constructed. /// - public new async Task Close() + public async Task Close() { await WaitForReady(); if (Closed) return; - Emit(new PowerSyncDBEvent { Closing = true }); + Events.Emit(new PowerSyncDBEvents.ClosingEvent()); await Disconnect(); - base.Close(); - masterCts.Cancel(); masterCts.Dispose(); @@ -499,7 +567,10 @@ public ISyncStream SyncStream(string name, Dictionary? parameter Database.Close(); Closed = true; - Emit(new PowerSyncDBEvent { Closed = true }); + + Events.Emit(new PowerSyncDBEvents.ClosedEvent()); + + Events.Close(); } private record UploadQueueStatsSizeCountResult(long size, long count); @@ -847,14 +918,11 @@ IAsyncEnumerable initialListener // Listen for schema changes in the background _ = Task.Run(async () => { - await foreach (var update in ListenAsync(signal.Token)) + await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token)) { - if (update.SchemaChanged != null) - { - // Swap schemaChanged with an unresolved TCS - var oldTcs = Interlocked.Exchange(ref schemaChanged, new()); - oldTcs.TrySetResult(true); - } + // Swap schemaChanged with an unresolved TCS + var oldTcs = Interlocked.Exchange(ref schemaChanged, new()); + oldTcs.TrySetResult(true); } }, signal.Token); diff --git a/PowerSync/PowerSync.Common/Utils/IEventManager.cs b/PowerSync/PowerSync.Common/Utils/IEventManager.cs new file mode 100644 index 0000000..42fe727 --- /dev/null +++ b/PowerSync/PowerSync.Common/Utils/IEventManager.cs @@ -0,0 +1,43 @@ +namespace PowerSync.Common.Utils; + +public interface IEventManager where TEvent : class +{ + /// + /// Attempts to retreive the EventStream associated with events of type T. + /// + bool TryGetStream(out EventStream stream) + where T : class, TEvent; + + /// + /// Posts a message to the stream managing events of type T. + /// + void Emit(T evt) + where T : class, TEvent; + + /// + /// Close all EventStream objects and disable the IEventManager. + /// + void Close(); +} + +public abstract class EventManager : IEventManager + where TEvent : class +{ + public abstract bool TryGetStream(out EventStream stream) + where T : class, TEvent; + + public void Emit(T evt) where T : class, TEvent + { + if (TryGetStream(out var stream)) + { + stream.Emit(evt); + } + else + { + throw new InvalidOperationException($"No stream emits events of type {nameof(T)}."); + } + } + + public abstract void Close(); +} + diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs index e9d46c5..5482556 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs @@ -65,13 +65,10 @@ public static async Task NextStatus(PowerSyncDatabase db) _ = Task.Run(async () => { - await foreach (var update in db.ListenAsync(cts.Token)) + await foreach (var update in db.Events.OnStatusChanged.ListenAsync(cts.Token)) { - if (update.StatusChanged != null) - { - tcs.TrySetResult(update.StatusChanged); - cts?.Cancel(); - } + tcs.TrySetResult(update.Status); + cts?.Cancel(); } }); From f9164ae2f96cde7050e7f87a302a3069818cac76 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 3 Mar 2026 14:59:25 +0200 Subject: [PATCH 2/8] Improve API for registering streams --- .../Client/PowerSyncDatabase.cs | 60 +++----------- .../PowerSync.Common/Utils/EventManager.cs | 80 +++++++++++++++++++ .../PowerSync.Common/Utils/EventStream.cs | 4 +- .../PowerSync.Common/Utils/ICloseable.cs | 12 +++ .../PowerSync.Common/Utils/IEventManager.cs | 43 ---------- 5 files changed, 102 insertions(+), 97 deletions(-) create mode 100644 PowerSync/PowerSync.Common/Utils/EventManager.cs create mode 100644 PowerSync/PowerSync.Common/Utils/ICloseable.cs delete mode 100644 PowerSync/PowerSync.Common/Utils/IEventManager.cs diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 1fb5934..131b24e 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -53,7 +53,7 @@ public class PowerSyncDatabaseOptions() : BasePowerSyncDatabaseOptions() public Func? RemoteFactory { get; set; } } -public class PowerSyncDBEvents : EventManager +public class PowerSyncDBEvents : EventManager { public interface IPowerSyncDBEvent; @@ -80,60 +80,18 @@ public class StatusUpdatedEvent(SyncStatusOptions status) : IPowerSyncDBEvent public EventStream OnStatusChanged { get; } = new(); public EventStream OnStatusUpdated { get; } = new(); - public override bool TryGetStream(out EventStream stream) + public PowerSyncDBEvents() { - if (typeof(T) == typeof(PowerSyncDBEvents.InitializedEvent)) - { - stream = (EventStream)(object)OnInitialized; - return true; - } - - if (typeof(T) == typeof(PowerSyncDBEvents.ClosingEvent)) - { - stream = (EventStream)(object)OnClosing; - return true; - } - - if (typeof(T) == typeof(PowerSyncDBEvents.ClosedEvent)) - { - stream = (EventStream)(object)OnClosed; - return true; - } - - if (typeof(T) == typeof(PowerSyncDBEvents.SchemaChangedEvent)) - { - stream = (EventStream)(object)OnSchemaChanged; - return true; - } - - if (typeof(T) == typeof(PowerSyncDBEvents.StatusChangedEvent)) - { - stream = (EventStream)(object)OnStatusChanged; - return true; - } - - if (typeof(T) == typeof(PowerSyncDBEvents.StatusUpdatedEvent)) - { - stream = (EventStream)(object)OnStatusUpdated; - return true; - } - - stream = null!; - return false; - } - - public override void Close() - { - OnInitialized.Close(); - OnClosing.Close(); - OnClosed.Close(); - OnSchemaChanged.Close(); - OnStatusChanged.Close(); - OnStatusUpdated.Close(); + Register(OnInitialized); + Register(OnClosing); + Register(OnClosed); + Register(OnSchemaChanged); + Register(OnStatusChanged); + Register(OnStatusUpdated); } } -public interface IPowerSyncDatabase +public interface IPowerSyncDatabase : ICloseableAsync { public Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnectionOptions? options = null); public ISyncStream SyncStream(string name, Dictionary? parameters = null); diff --git a/PowerSync/PowerSync.Common/Utils/EventManager.cs b/PowerSync/PowerSync.Common/Utils/EventManager.cs new file mode 100644 index 0000000..5a18a49 --- /dev/null +++ b/PowerSync/PowerSync.Common/Utils/EventManager.cs @@ -0,0 +1,80 @@ +namespace PowerSync.Common.Utils; + +public interface IEventManager : ICloseable +{ + /// + /// Registers a new EventStream into the EventManager. + /// + void Register(EventStream stream); + + /// + /// Attempts to retreive the EventStream associated with events of type T. + /// + bool TryGetStream(out EventStream stream); + + /// + /// Posts a message to the stream managing events of type T. + /// + void Emit(T evt); + + /// + /// Attemps to post a message to the stream managing events of type T. + /// + bool TryEmit(T evt); +} + +public class EventManager : IEventManager +{ + private readonly Dictionary _streams = new(); + + public void Register(EventStream stream) + { + _streams[typeof(T)] = stream; + } + + public bool TryGetStream(out EventStream stream) + { + if (_streams.TryGetValue(typeof(T), out var streamObj)) + { + stream = (EventStream)streamObj; + return true; + } + stream = null!; + return false; + } + + public void Emit(T evt) + { + if (TryGetStream(out var stream)) + { + stream.Emit(evt); + } + else + { + throw new InvalidOperationException($"No stream emits events of type {nameof(T)}."); + } + } + + public bool TryEmit(T evt) + { + if (TryGetStream(out var stream)) + { + stream.Emit(evt); + return true; + } + else + { + return false; + } + } + + public void Close() + { + foreach (var kvp in _streams) + { + ((ICloseable)kvp.Value).Close(); + } + _streams.Clear(); + } +} + diff --git a/PowerSync/PowerSync.Common/Utils/EventStream.cs b/PowerSync/PowerSync.Common/Utils/EventStream.cs index 5dc209f..b229e83 100644 --- a/PowerSync/PowerSync.Common/Utils/EventStream.cs +++ b/PowerSync/PowerSync.Common/Utils/EventStream.cs @@ -4,7 +4,7 @@ namespace PowerSync.Common.Utils; using System.Runtime.CompilerServices; using System.Threading.Channels; -public interface IEventStream +public interface IEventStream : ICloseable { void Emit(T item); @@ -13,8 +13,6 @@ public interface IEventStream IEnumerable Listen(CancellationToken cancellationToken); IAsyncEnumerable ListenAsync(CancellationToken cancellationToken); - - void Close(); } public class EventStream : IEventStream diff --git a/PowerSync/PowerSync.Common/Utils/ICloseable.cs b/PowerSync/PowerSync.Common/Utils/ICloseable.cs new file mode 100644 index 0000000..955d7b7 --- /dev/null +++ b/PowerSync/PowerSync.Common/Utils/ICloseable.cs @@ -0,0 +1,12 @@ +namespace PowerSync.Common.Utils; + +public interface ICloseable +{ + public void Close(); +} + +public interface ICloseableAsync +{ + public Task Close(); +} + diff --git a/PowerSync/PowerSync.Common/Utils/IEventManager.cs b/PowerSync/PowerSync.Common/Utils/IEventManager.cs deleted file mode 100644 index 42fe727..0000000 --- a/PowerSync/PowerSync.Common/Utils/IEventManager.cs +++ /dev/null @@ -1,43 +0,0 @@ -namespace PowerSync.Common.Utils; - -public interface IEventManager where TEvent : class -{ - /// - /// Attempts to retreive the EventStream associated with events of type T. - /// - bool TryGetStream(out EventStream stream) - where T : class, TEvent; - - /// - /// Posts a message to the stream managing events of type T. - /// - void Emit(T evt) - where T : class, TEvent; - - /// - /// Close all EventStream objects and disable the IEventManager. - /// - void Close(); -} - -public abstract class EventManager : IEventManager - where TEvent : class -{ - public abstract bool TryGetStream(out EventStream stream) - where T : class, TEvent; - - public void Emit(T evt) where T : class, TEvent - { - if (TryGetStream(out var stream)) - { - stream.Emit(evt); - } - else - { - throw new InvalidOperationException($"No stream emits events of type {nameof(T)}."); - } - } - - public abstract void Close(); -} - From fc869790d77cec6da1d2ea6f034f805fa8839349 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 3 Mar 2026 15:46:10 +0200 Subject: [PATCH 3/8] Add Deregister method + tests --- .../PowerSync.Common/Utils/EventManager.cs | 11 ++ .../EventStreamTests.cs | 133 +++++++++++++++++- 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/PowerSync/PowerSync.Common/Utils/EventManager.cs b/PowerSync/PowerSync.Common/Utils/EventManager.cs index 5a18a49..2ee006a 100644 --- a/PowerSync/PowerSync.Common/Utils/EventManager.cs +++ b/PowerSync/PowerSync.Common/Utils/EventManager.cs @@ -7,6 +7,12 @@ public interface IEventManager : ICloseable /// void Register(EventStream stream); + /// + /// Deregisters the stream associated with the EventManager. + /// This does NOT close the stream in the default implementation. + /// + bool Deregister(); + /// /// Attempts to retreive the EventStream associated with events of type T. /// @@ -32,6 +38,11 @@ public void Register(EventStream stream) _streams[typeof(T)] = stream; } + public bool Deregister() + { + return _streams.Remove(typeof(T)); + } + public bool TryGetStream(out EventStream stream) { if (_streams.TryGetValue(typeof(T), out var streamObj)) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs index a924a23..6faacc8 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs @@ -5,7 +5,6 @@ namespace PowerSync.Common.Tests; public class EventStreamTests { - [Fact] public async Task EventStream_ShouldReceiveTwoMessages_Async() { @@ -109,4 +108,136 @@ public async Task EventStream_ShouldReceiveTwoMessages_Sync() Assert.Contains(status2, receivedMessages); Assert.Equal(0, eventStream.SubscriberCount()); } + + [Fact] + public async Task EventManager_RegistersStreamsCorrectly() + { + var manager = new EventManager(); + var stream1 = new EventStream(); + var stream2 = new EventStream(); + var stream3 = new EventStream(); // Control + + manager.Register(stream1); + manager.Register(stream2); + + Assert.True(manager.TryGetStream(out var obtainedStream1)); + Assert.True(manager.TryGetStream(out var obtainedStream2)); + Assert.False(manager.TryGetStream(out var obtainedStream3)); + + Assert.Equal(stream1, obtainedStream1); + Assert.Equal(stream2, obtainedStream2); + Assert.Equal(null, obtainedStream3); + + manager.Close(); + } + + [Fact] + public async Task EventManager_CloseRemovesAndClosesStreams() + { + var manager = new EventManager(); + var stream1 = new EventStream(); + var stream2 = new EventStream(); + var stream3 = new EventStream(); // Control + + manager.Register(stream1); + manager.Register(stream2); + + manager.Close(); + + Assert.False(manager.TryGetStream(out var obtainedStream1)); + Assert.False(manager.TryGetStream(out var obtainedStream2)); + Assert.False(manager.TryGetStream(out var obtainedStream3)); + + Assert.Equal(null, obtainedStream1); + Assert.Equal(null, obtainedStream2); + Assert.Equal(null, obtainedStream3); + + Assert.True(stream1.Closed); + Assert.True(stream2.Closed); + Assert.False(stream3.Closed); + } + + [Fact(Timeout = 2000)] + public async Task EventManager_ShouldReceiveEmittedEvents() + { + var manager = new EventManager(); + var stream1 = new EventStream(); + var stream2 = new EventStream(); + + manager.Register(stream1); + manager.Register(stream2); + + var cts1 = new CancellationTokenSource(); + var listener1 = stream1.ListenAsync(cts1.Token); + Assert.True(manager.TryEmit(false)); + Assert.True(manager.TryEmit(false)); + Assert.True(manager.TryEmit(true)); + int eventCount = 0; + + await foreach (var evt in listener1) + { + eventCount++; + if (evt == true) + { + cts1.Cancel(); + } + } + + Assert.Equal(3, eventCount); + + var cts2 = new CancellationTokenSource(); + var listener2 = stream2.ListenAsync(cts2.Token); + Assert.True(manager.TryEmit("hi")); + Assert.True(manager.TryEmit("hello")); + Assert.True(manager.TryEmit("sup")); + Assert.True(manager.TryEmit("STOP")); + eventCount = 0; + + await foreach (var evt in listener2) + { + eventCount++; + if (evt == "STOP") + { + cts2.Cancel(); + } + } + + Assert.Equal(4, eventCount); + + manager.Close(); + } + + [Fact] + public async Task EventManager_ShouldNotReceiveEventsAfterDeregistering() + { + var manager = new EventManager(); + var stream = new EventStream(); + + manager.Register(stream); + + var cts = new CancellationTokenSource(); + var listener = stream.ListenAsync(cts.Token); + var sem = new SemaphoreSlim(0); + int eventCount = 0; + + _ = Task.Run(async () => + { + sem.Release(); + await foreach (var evt in listener) + { + eventCount++; + sem.Release(); + } + }, cts.Token); + Assert.True(await sem.WaitAsync(100)); + + Assert.True(manager.Deregister()); + + Assert.False(manager.TryEmit("invalid")); + Assert.False(await sem.WaitAsync(100)); + + // Cleanup + cts.Cancel(); + manager.Close(); + } } From 38f9225db141a20b6b9896d7f77e5021e1d365e2 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 3 Mar 2026 17:01:25 +0200 Subject: [PATCH 4/8] Use new pattern around codebase --- .../Client/ConnectionManager.cs | 26 +++++-- .../Client/PowerSyncDatabase.cs | 43 +++++------- .../Sync/Bucket/BucketStorageAdapter.cs | 16 ++++- .../Client/Sync/Bucket/SqliteBucketStorage.cs | 27 ++++--- .../Stream/StreamingSyncImplementation.cs | 70 ++++++++++++++----- PowerSync/PowerSync.Common/DB/IDBAdapter.cs | 27 +++++-- .../MDSQLite/MDSQLiteAdapter.cs | 10 +-- .../MDSQLite/MDSQLiteConnection.cs | 5 +- .../PowerSync.Common/Utils/EventManager.cs | 2 +- .../Client/Sync/SyncStreamsTests.cs | 8 --- 10 files changed, 147 insertions(+), 87 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/ConnectionManager.cs b/PowerSync/PowerSync.Common/Client/ConnectionManager.cs index fec5f02..50b6ce7 100644 --- a/PowerSync/PowerSync.Common/Client/ConnectionManager.cs +++ b/PowerSync/PowerSync.Common/Client/ConnectionManager.cs @@ -79,14 +79,25 @@ public class StoredConnectionOptions( public PowerSyncConnectionOptions Options { get; set; } = options; } -public class ConnectionManagerEvent +public class ConnectionManagerEvents : EventManager { - public StreamingSyncImplementation? SyncStreamCreated { get; set; } + public interface IConnectionManagerEvent; + + public class SyncStreamCreatedEvent(StreamingSyncImplementation ssi) : IConnectionManagerEvent + { + public StreamingSyncImplementation SyncStreamCreated { get; set; } = ssi; + } + + public EventStream OnSyncStreamCreated { get; } = new(); + + public ConnectionManagerEvents() + { + Register(OnSyncStreamCreated); + } } -public class ConnectionManager : EventStream +public class ConnectionManager : ICloseable { - /// /// Tracks active connection attempts /// @@ -122,6 +133,7 @@ public class ConnectionManager : EventStream public IPowerSyncBackendConnector? Connector => PendingConnectionOptions?.Connector; + public ConnectionManagerEvents Events { get; protected set; } = new(); public PowerSyncConnectionOptions? ConnectionOptions => PendingConnectionOptions?.Options; @@ -148,9 +160,9 @@ public ConnectionManager(Func { await foreach (var update in syncStreamStatusListener) { - if (update.StatusChanged != null) + CurrentStatus = new SyncStatus(new SyncStatusOptions(update.Status.Options) { - CurrentStatus = new SyncStatus(new SyncStatusOptions(update.StatusChanged.Options) - { - HasSynced = CurrentStatus?.HasSynced == true || update.StatusChanged.LastSyncedAt != null, - }); - Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); - } + HasSynced = CurrentStatus?.HasSynced == true || update.Status.LastSyncedAt != null, + }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } }); @@ -784,7 +781,7 @@ public IAsyncEnumerable OnChange(SQLWatchOptions? options = ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value) : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); - var listener = Database.ListenAsync(signal.Token); + var listener = Database.Events.OnTablesUpdated.ListenAsync(signal.Token); // Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the // connection is established @@ -793,7 +790,7 @@ public IAsyncEnumerable OnChange(SQLWatchOptions? options = private async IAsyncEnumerable OnChangeCore( HashSet watchedTables, - IAsyncEnumerable listener, + IAsyncEnumerable listener, [EnumeratorCancellation] CancellationToken signal, bool triggerImmediately ) @@ -807,7 +804,6 @@ bool triggerImmediately await foreach (var e in listener) { if (signal.IsCancellationRequested) yield break; - if (e.TablesUpdated == null) continue; changedTables.Clear(); GetTablesFromNotification(e.TablesUpdated, changedTables); @@ -857,7 +853,7 @@ public IAsyncEnumerable Watch( // so that table changes between Watch() being called and iteration starting are not missed. // This mirrors the pattern used in OnChange(). var initialRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); - var initialListener = Database.ListenAsync(initialRestartCts.Token); + var initialListener = Database.Events.OnTablesUpdated.ListenAsync(initialRestartCts.Token); return WatchCore(sql, parameters, options, signal, initialRestartCts, initialListener); } @@ -868,7 +864,7 @@ private async IAsyncEnumerable WatchCore( SQLWatchOptions options, CancellationTokenSource signal, CancellationTokenSource initialRestartCts, - IAsyncEnumerable initialListener + IAsyncEnumerable initialListener ) { var schemaChanged = new TaskCompletionSource(); @@ -930,7 +926,7 @@ IAsyncEnumerable initialListener // Establish a new listener BEFORE resolving source tables in the next iteration, // so that changes during the async GetSourceTables call are not missed. currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); - currentListener = Database.ListenAsync(currentRestartCts.Token); + currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token); break; } @@ -977,7 +973,7 @@ internal async Task> GetSourceTables(string sql, object?[]? para private async IAsyncEnumerable OnRawTableChange( HashSet watchedTables, - IAsyncEnumerable listener, + IAsyncEnumerable listener, [EnumeratorCancellation] CancellationToken token, bool triggerImmediately = false ) @@ -990,19 +986,16 @@ private async IAsyncEnumerable OnRawTableChange( HashSet changedTables = new(); await foreach (var e in listener) { - if (e.TablesUpdated != null) - { - if (token.IsCancellationRequested) break; + if (token.IsCancellationRequested) break; - // Extract the changed tables and intersect with the watched tables - changedTables.Clear(); - GetTablesFromNotification(e.TablesUpdated, changedTables); - changedTables.IntersectWith(watchedTables); + // Extract the changed tables and intersect with the watched tables + changedTables.Clear(); + GetTablesFromNotification(e.TablesUpdated, changedTables); + changedTables.IntersectWith(watchedTables); - if (changedTables.Count == 0) continue; + if (changedTables.Count == 0) continue; - yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; - } + yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; } } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index dd1b105..f0458b3 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -100,13 +100,23 @@ public static class PSInternalTable public static readonly string UNTYPED = "ps_untyped"; } -public class BucketStorageEvent +public class BucketStorageEvents : EventManager { - public bool CrudUpdate { get; set; } + public interface IBucketStorageEvent; + + public class CrudUpdateEvent : IBucketStorageEvent; + + public EventStream OnCrudUpdate = new(); + + public BucketStorageEvents() + { + Register(OnCrudUpdate); + } } -public interface IBucketStorageAdapter : IEventStream +public interface IBucketStorageAdapter : ICloseable { + BucketStorageEvents Events { get; } Task Init(); Task NextCrudItem(); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 48e290a..e72e963 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -12,13 +12,13 @@ namespace PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.DB; using PowerSync.Common.DB.Crud; -using PowerSync.Common.Utils; -public class SqliteBucketStorage : EventStream, IBucketStorageAdapter +public class SqliteBucketStorage : IBucketStorageAdapter { - public static readonly string MAX_OP_ID = "9223372036854775807"; + public BucketStorageEvents Events { get; } = new(); + private readonly IDBAdapter db; private bool hasCompletedSync; private readonly HashSet tableNames; @@ -27,29 +27,27 @@ public class SqliteBucketStorage : EventStream, IBucketStora private readonly ILogger logger; private readonly CancellationTokenSource updateCts; + private readonly Task updateTask; private record ExistingTableRowsResult(string name); public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null) { this.db = db; - this.logger = logger ?? NullLogger.Instance; ; + this.logger = logger ?? NullLogger.Instance; hasCompletedSync = false; tableNames = []; updateCts = new CancellationTokenSource(); - var _ = Task.Run(() => + updateTask = Task.Run(() => { - foreach (var update in db.Listen(updateCts.Token)) + foreach (var update in db.Events.OnTablesUpdated.Listen(updateCts.Token)) { - if (update.TablesUpdated != null) + var tables = DBAdapterUtils.ExtractTableUpdates(update.TablesUpdated); + if (tables.Contains(PSInternalTable.CRUD)) { - var tables = DBAdapterUtils.ExtractTableUpdates(update.TablesUpdated); - if (tables.Contains(PSInternalTable.CRUD)) - { - Emit(new BucketStorageEvent { CrudUpdate = true }); - } + Events.Emit(new BucketStorageEvents.CrudUpdateEvent()); } } }); @@ -67,10 +65,11 @@ public async Task Init() } } - public new void Close() + public void Close() { updateCts.Cancel(); - base.Close(); + try { updateTask.Wait(2000); } catch (Exception) { } + Events.Close(); } private record ClientIdResult(string? client_id); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 631543b..be6d223 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -89,17 +89,34 @@ public class RequiredPowerSyncConnectionOptions : BaseConnectionOptions public new bool IncludeDefaultStreams { get; set; } = default; } -public class StreamingSyncImplementationEvent +public class StreamingSyncImplementationEvents : EventManager { + public interface IStreamingSyncImplementationEvent; + + public class StatusUpdatedEvent(SyncStatusOptions status) : IStreamingSyncImplementationEvent + { + public SyncStatusOptions Status { get; set; } = status; + } + public class StatusChangedEvent(SyncStatus status) : IStreamingSyncImplementationEvent + { + public SyncStatus Status { get; set; } = status; + } + /// - /// Set whenever a status update has been attempted to be made or refreshed. + /// See whenever a status update has been attempted to be made or refreshed. /// - public SyncStatusOptions? StatusUpdated { get; set; } + public EventStream OnStatusUpdated { get; } = new(); /// - /// Set whenever the status' members have changed in value. + /// See whenever the status' members have changed in value. /// - public SyncStatus? StatusChanged { get; set; } + public EventStream OnStatusChanged { get; } = new(); + + public StreamingSyncImplementationEvents() + { + Register(OnStatusUpdated); + Register(OnStatusChanged); + } } public class PowerSyncConnectionOptions( @@ -131,7 +148,7 @@ public class SubscribedStream } -public class StreamingSyncImplementation : EventStream +public class StreamingSyncImplementation : ICloseable { public static RequiredPowerSyncConnectionOptions DEFAULT_STREAM_CONNECTION_OPTIONS = new() { @@ -140,6 +157,8 @@ public class StreamingSyncImplementation : EventStream + crudUploadTask = Task.Run(async () => { await InternalUploadAllCrud(); notifyCompletedUploads?.Invoke(); @@ -227,7 +249,7 @@ public async Task Connect(PowerSyncConnectionOptions? options = null) var cts = new CancellationTokenSource(); // Subscribe to events before starting StreamingSync to not miss the Connected == true event - var listener = ListenAsync(cts.Token); + var listener = Events.OnStatusChanged.ListenAsync(cts.Token); streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options); @@ -235,7 +257,7 @@ public async Task Connect(PowerSyncConnectionOptions? options = null) { await foreach (var status in listener) { - if (status.StatusChanged?.Connected == true) + if (status.Status.Connected == true) { tcs.TrySetResult(true); cts.Cancel(); @@ -274,11 +296,25 @@ public async Task Disconnect() catch (Exception ex) { // The operation might have failed, all we care about is if it has completed - logger.LogWarning("{Message}", ex.Message); + logger.LogWarning("Streaming sync task failed during disconnect: {Message}", ex.Message); } streamingSyncTask = null; CancellationTokenSource = null; + // Do the same for any pending CRUD uploads + if (crudUploadTask != null) + { + try + { + await crudUploadTask; + } + catch (Exception ex) + { + logger.LogWarning("CRUD upload task failed during disconnect: {Message}", ex.Message); + } + crudUploadTask = null; + } + UpdateSyncStatus(new SyncStatusOptions { Connected = false, Connecting = false }); } @@ -291,9 +327,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio } crudUpdateCts = new CancellationTokenSource(); - var _ = Task.Run(() => + crudUpdateTask = Task.Run(async () => { - foreach (var _ in Options.Adapter.Listen(crudUpdateCts.Token)) + await foreach (var _ in Options.Adapter.Events.OnCrudUpdate.ListenAsync(crudUpdateCts.Token)) { TriggerCrudUpload(); } @@ -307,6 +343,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio nestedCts.Cancel(); crudUpdateCts?.Cancel(); crudUpdateCts = null; + try { crudUpdateTask?.Wait(2000); } catch (Exception) { } UpdateSyncStatus(new SyncStatusOptions { Connected = false, @@ -672,11 +709,12 @@ async Task HandleInstruction(Instruction instruction) return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart }; } - public new void Close() + public void Close() { crudUpdateCts?.Cancel(); - base.Close(); crudUpdateCts = null; + try { crudUpdateTask?.Wait(2000); } catch (Exception) { } + Events.Close(); } public record ResponseData( @@ -816,11 +854,11 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio SyncStatus = updatedStatus; logger.LogDebug("[Sync status changed]: {message}", updatedStatus.ToJSON()); // Only trigger this if there was a change - Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); + Events.Emit(new StreamingSyncImplementationEvents.StatusChangedEvent(updatedStatus)); } // Trigger this for all updates - Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); + Events.Emit(new StreamingSyncImplementationEvents.StatusUpdatedEvent(options)); } catch (Exception ex) { diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index 7825cd5..834215c 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -73,9 +73,7 @@ public class TableUpdateOperation(RowUpdateType OpType, long RowId) public long RowId { get; set; } = RowId; } -public interface INotification -{ -} +public interface INotification; public class UpdateNotification(string table, RowUpdateType OpType, long RowId) : TableUpdateOperation(OpType, RowId), INotification { @@ -89,9 +87,21 @@ public class BatchedUpdateNotification : INotification public Dictionary GroupedUpdates { get; set; } = []; } -public class DBAdapterEvent +public class DBAdapterEvents : EventManager { - public INotification? TablesUpdated { get; set; } + public interface IDBAdapterEvent; + + public class TablesUpdatedEvent(INotification tablesUpdatedNotification) + { + public INotification TablesUpdated { get; set; } = tablesUpdatedNotification; + } + + public EventStream OnTablesUpdated = new(); + + public DBAdapterEvents() + { + Register(OnTablesUpdated); + } } public class DBLockOptions @@ -113,7 +123,7 @@ public static string[] ExtractTableUpdates(INotification update) } } -public interface IDBAdapter : IEventStream, ILockContext +public interface IDBAdapter : ILockContext, ICloseable { /// /// Closes the adapter. @@ -125,6 +135,11 @@ public interface IDBAdapter : IEventStream, ILockContext /// string Name { get; } + /// + /// The event manager for the adapter. + /// + DBAdapterEvents Events { get; } + /// /// Executes a read lock with the given function. /// diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 6400fe9..c7bb653 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -18,10 +18,12 @@ public class MDSQLiteAdapterOptions() } -public class MDSQLiteAdapter : EventStream, IDBAdapter +public class MDSQLiteAdapter : IDBAdapter { public string Name => options.Name; + public DBAdapterEvents Events { get; } = new(); + public MDSQLiteConnection? writeConnection; public MDSQLiteConnection? readConnection; @@ -102,7 +104,7 @@ private async Task Init() { if (notification.TablesUpdated != null) { - Emit(notification); + Events.Emit(notification); } } }); @@ -133,11 +135,11 @@ protected virtual void LoadExtension(SqliteConnection db) db.LoadExtension(extensionPath, "sqlite3_powersync_init"); } - public new void Close() + public void Close() { + Events.Close(); tablesUpdatedCts?.Cancel(); try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { } - base.Close(); writeConnection?.Close(); readConnection?.Close(); } diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index d6eb924..7d9e4ad 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -18,9 +18,8 @@ public class MDSQLiteConnectionOptions(SqliteConnection database) public SqliteConnection Database { get; set; } = database; } -public class MDSQLiteConnection : EventStream, ILockContext +public class MDSQLiteConnection : EventStream, ILockContext { - public SqliteConnection Db; private readonly List updateBuffer; public MDSQLiteConnection(MDSQLiteConnectionOptions options) @@ -71,7 +70,7 @@ public void FlushUpdates() }; updateBuffer.Clear(); - Emit(new DBAdapterEvent { TablesUpdated = batchedUpdate }); + Emit(new DBAdapterEvents.TablesUpdatedEvent(batchedUpdate)); } private static List PrepareQueryString(ref string query, int parameterCount) diff --git a/PowerSync/PowerSync.Common/Utils/EventManager.cs b/PowerSync/PowerSync.Common/Utils/EventManager.cs index 2ee006a..1c0ff31 100644 --- a/PowerSync/PowerSync.Common/Utils/EventManager.cs +++ b/PowerSync/PowerSync.Common/Utils/EventManager.cs @@ -62,7 +62,7 @@ public void Emit(T evt) } else { - throw new InvalidOperationException($"No stream emits events of type {nameof(T)}."); + throw new InvalidOperationException($"No stream emits events of type {typeof(T).Name}."); } } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs index 286ea4a..aedfa4e 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs @@ -1,14 +1,8 @@ namespace PowerSync.Common.Tests.Client.Sync; -using System.Runtime.CompilerServices; - using Common.Client.Sync.Stream; -using Newtonsoft.Json; - using PowerSync.Common.Client; -using PowerSync.Common.DB.Crud; -using PowerSync.Common.DB.Schema; using PowerSync.Common.Tests.Utils; using PowerSync.Common.Tests.Utils.Sync; @@ -209,6 +203,4 @@ public async Task UnsubscribeAllTest() await db.Connect(new TestConnector(), new PowerSyncConnectionOptions()); TestUtils.DeepEquivalent(new RequestStream { IncludeDefaults = true, Subscriptions = [] }, syncService.Requests[0].Streams); } - - } From 4c912c311b9396c6822ebe022f9ac12c27e6f32c Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 3 Mar 2026 17:06:17 +0200 Subject: [PATCH 5/8] Changelog --- PowerSync/PowerSync.Common/CHANGELOG.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 95d792f..891dfaa 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,29 @@ ## 0.0.11-alpha.1 +- Converted most instances of a class inheriting from `EventStream` into a class with an `EventManager` property called `Events`. This allows for subscribing to individual events instead of subscribing to all events and then filtering events manually. + +```csharp +// Old +var listener = db.Listen(cts.Token); +foreach (PowerSyncDBEvent update in listener) +{ + // Manually filter updates + if (update.StatusChanged != null) + { + Console.WriteLine("status changed: " + update.StatusChanged!); + } +} + +// New +var listener = db.Events.OnStatusChanged.Listen(cts.Token); +foreach (PowerSyncDBEvents.StatusChangedEvent update in listener) +{ + // Events are filtered inherently + Console.WriteLine("status changed: " + update.Status); +} +``` + - Updated to the latest version (0.4.11) of the core extension. - `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking. - Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API). From d7e9f004d0565e3d6fa7c667f537da5ff0058ad9 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 3 Mar 2026 17:16:23 +0200 Subject: [PATCH 6/8] Small fixes --- .../Client/Sync/Bucket/BucketStorageAdapter.cs | 2 +- PowerSync/PowerSync.Common/DB/IDBAdapter.cs | 4 ++-- PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index f0458b3..c073561 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -106,7 +106,7 @@ public interface IBucketStorageEvent; public class CrudUpdateEvent : IBucketStorageEvent; - public EventStream OnCrudUpdate = new(); + public EventStream OnCrudUpdate { get; } = new(); public BucketStorageEvents() { diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index 834215c..e84cc67 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -91,12 +91,12 @@ public class DBAdapterEvents : EventManager { public interface IDBAdapterEvent; - public class TablesUpdatedEvent(INotification tablesUpdatedNotification) + public class TablesUpdatedEvent(INotification tablesUpdatedNotification) : IDBAdapterEvent { public INotification TablesUpdated { get; set; } = tablesUpdatedNotification; } - public EventStream OnTablesUpdated = new(); + public EventStream OnTablesUpdated { get; } = new(); public DBAdapterEvents() { diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index c7bb653..fc44fa3 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -137,11 +137,11 @@ protected virtual void LoadExtension(SqliteConnection db) public void Close() { - Events.Close(); tablesUpdatedCts?.Cancel(); try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { } writeConnection?.Close(); readConnection?.Close(); + Events.Close(); } public async Task Execute(string query, object?[]? parameters = null) From c22f36ed8a7296b05142b2eb7063674e30d105fc Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Wed, 4 Mar 2026 10:21:08 +0200 Subject: [PATCH 7/8] Add MAUI changelog, emit SyncStatus instead of SyncStatusOptions, changelog examples --- PowerSync/PowerSync.Common/CHANGELOG.md | 22 ++++++++++++++++++- .../Stream/StreamingSyncImplementation.cs | 20 ++++++++++++----- PowerSync/PowerSync.Maui/CHANGELOG.md | 4 ++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 6478abb..46c0785 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.0.11-alpha.1 +- `StatusUpdated` and `StatusChanged` now both emit `SyncStatus` objects instead of just `StatusChanged`. - Converted most instances of a class inheriting from `EventStream` into a class with an `EventManager` property called `Events`. This allows for subscribing to individual events instead of subscribing to all events and then filtering events manually. ```csharp @@ -16,9 +17,28 @@ foreach (PowerSyncDBEvent update in listener) } } +// Old (async) +var listener = db.ListenAsync(cts.Token); +await foreach (PowerSyncDBEvent update in listener) +{ + // Manually filter updates + if (update.StatusChanged != null) + { + Console.WriteLine("status changed: " + update.StatusChanged!); + } +} + // New var listener = db.Events.OnStatusChanged.Listen(cts.Token); -foreach (PowerSyncDBEvents.StatusChangedEvent update in listener) +await foreach (PowerSyncDBEvents.StatusChanged update in listener) +{ + // Events are filtered inherently + Console.WriteLine("status changed: " + update.Status); +} + +// New (async) - recommended for most use cases +var listener = db.Events.OnStatusChanged.ListenAsync(cts.Token); +await foreach (PowerSyncDBEvents.StatusChanged update in listener) { // Events are filtered inherently Console.WriteLine("status changed: " + update.Status); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index be6d223..b1abb88 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -93,9 +93,9 @@ public class StreamingSyncImplementationEvents : EventManager { public interface IStreamingSyncImplementationEvent; - public class StatusUpdatedEvent(SyncStatusOptions status) : IStreamingSyncImplementationEvent + public class StatusUpdatedEvent(SyncStatus status) : IStreamingSyncImplementationEvent { - public SyncStatusOptions Status { get; set; } = status; + public SyncStatus Status { get; set; } = status; } public class StatusChangedEvent(SyncStatus status) : IStreamingSyncImplementationEvent { @@ -853,12 +853,20 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio { SyncStatus = updatedStatus; logger.LogDebug("[Sync status changed]: {message}", updatedStatus.ToJSON()); + + // Emit events using new SyncStatus objects to prevent local modifications propagating to StreamingSyncImplementation + // Only trigger this if there was a change - Events.Emit(new StreamingSyncImplementationEvents.StatusChangedEvent(updatedStatus)); - } + Events.Emit(new StreamingSyncImplementationEvents.StatusChangedEvent(new SyncStatus(updatedStatus.Options))); - // Trigger this for all updates - Events.Emit(new StreamingSyncImplementationEvents.StatusUpdatedEvent(options)); + // Emit StatusUpdated event wrapping a new SyncStatus object (prevents race conditions) + Events.Emit(new StreamingSyncImplementationEvents.StatusUpdatedEvent(new SyncStatus(updatedStatus.Options))); + } + else + { + // Emit StatusUpdated event directly wrapping `updatedStatus` (not exposed elsewhere) + Events.Emit(new StreamingSyncImplementationEvents.StatusUpdatedEvent(updatedStatus)); + } } catch (Exception ex) { diff --git a/PowerSync/PowerSync.Maui/CHANGELOG.md b/PowerSync/PowerSync.Maui/CHANGELOG.md index 3ca510c..594783f 100644 --- a/PowerSync/PowerSync.Maui/CHANGELOG.md +++ b/PowerSync/PowerSync.Maui/CHANGELOG.md @@ -1,5 +1,9 @@ # PowerSync.Maui Changelog +## 0.0.9-alpha.1 + +- Upstream PowerSync.Common version bump (See Powersync.Common changelog 0.0.11-alpha.1 for more information) + ## 0.0.8-alpha.1 - Upstream PowerSync.Common version bump (See Powersync.Common changelog 0.0.10-alpha.1 for more information) From 91a18bf2c399510a611587cfa4c57ae06243e057 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Wed, 4 Mar 2026 10:30:44 +0200 Subject: [PATCH 8/8] Update demos --- demos/CommandLine/Demo.cs | 7 ++----- demos/MAUITodo/Views/ListsPage.xaml.cs | 11 ++++------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/demos/CommandLine/Demo.cs b/demos/CommandLine/Demo.cs index 623581f..3407733 100644 --- a/demos/CommandLine/Demo.cs +++ b/demos/CommandLine/Demo.cs @@ -121,12 +121,9 @@ static async Task Main() _ = Task.Run(async () => { - await foreach (var update in db.ListenAsync(new CancellationToken())) + await foreach (var update in db.Events.OnStatusChanged.ListenAsync(new CancellationToken())) { - if (update.StatusChanged != null) - { - connected = update.StatusChanged.Connected; - } + connected = update.Status.Connected; } }); diff --git a/demos/MAUITodo/Views/ListsPage.xaml.cs b/demos/MAUITodo/Views/ListsPage.xaml.cs index 8f0be46..0211905 100644 --- a/demos/MAUITodo/Views/ListsPage.xaml.cs +++ b/demos/MAUITodo/Views/ListsPage.xaml.cs @@ -22,15 +22,12 @@ protected override async void OnAppearing() _ = Task.Run(async () => { - await foreach (var update in database.Db.ListenAsync(new CancellationToken())) + await foreach (var update in database.Db.Events.OnStatusChanged.ListenAsync(new CancellationToken())) { - if (update.StatusChanged != null) + MainThread.BeginInvokeOnMainThread(() => { - MainThread.BeginInvokeOnMainThread(() => - { - WifiStatusItem.IconImageSource = update.StatusChanged.Connected ? "wifi.png" : "wifi_off.png"; - }); - } + WifiStatusItem.IconImageSource = update.Status.Connected ? "wifi.png" : "wifi_off.png"; + }); } });