diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 74b81f9..46c0785 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,49 @@ ## 0.0.11-alpha.1 +- `StatusUpdated` and `StatusChanged` now both emit `SyncStatus` objects instead of just `StatusChanged`. +- Converted most instances of a class inheriting from `EventStream` into a class with an `EventManager` property called `Events`. This allows for subscribing to individual events instead of subscribing to all events and then filtering events manually. + +```csharp +// Old +var listener = db.Listen(cts.Token); +foreach (PowerSyncDBEvent update in listener) +{ + // Manually filter updates + if (update.StatusChanged != null) + { + Console.WriteLine("status changed: " + update.StatusChanged!); + } +} + +// Old (async) +var listener = db.ListenAsync(cts.Token); +await foreach (PowerSyncDBEvent update in listener) +{ + // Manually filter updates + if (update.StatusChanged != null) + { + Console.WriteLine("status changed: " + update.StatusChanged!); + } +} + +// New +var listener = db.Events.OnStatusChanged.Listen(cts.Token); +await foreach (PowerSyncDBEvents.StatusChanged update in listener) +{ + // Events are filtered inherently + Console.WriteLine("status changed: " + update.Status); +} + +// New (async) - recommended for most use cases +var listener = db.Events.OnStatusChanged.ListenAsync(cts.Token); +await foreach (PowerSyncDBEvents.StatusChanged update in listener) +{ + // Events are filtered inherently + Console.WriteLine("status changed: " + update.Status); +} +``` + - Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5. - Updated to the latest version (0.4.11) of the core extension. - `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking. diff --git a/PowerSync/PowerSync.Common/Client/ConnectionManager.cs b/PowerSync/PowerSync.Common/Client/ConnectionManager.cs index fec5f02..50b6ce7 100644 --- a/PowerSync/PowerSync.Common/Client/ConnectionManager.cs +++ b/PowerSync/PowerSync.Common/Client/ConnectionManager.cs @@ -79,14 +79,25 @@ public class StoredConnectionOptions( public PowerSyncConnectionOptions Options { get; set; } = options; } -public class ConnectionManagerEvent +public class ConnectionManagerEvents : EventManager { - public StreamingSyncImplementation? SyncStreamCreated { get; set; } + public interface IConnectionManagerEvent; + + public class SyncStreamCreatedEvent(StreamingSyncImplementation ssi) : IConnectionManagerEvent + { + public StreamingSyncImplementation SyncStreamCreated { get; set; } = ssi; + } + + public EventStream OnSyncStreamCreated { get; } = new(); + + public ConnectionManagerEvents() + { + Register(OnSyncStreamCreated); + } } -public class ConnectionManager : EventStream +public class ConnectionManager : ICloseable { - /// /// Tracks active connection attempts /// @@ -122,6 +133,7 @@ public class ConnectionManager : EventStream public IPowerSyncBackendConnector? Connector => PendingConnectionOptions?.Connector; + public ConnectionManagerEvents Events { get; protected set; } = new(); public PowerSyncConnectionOptions? ConnectionOptions => PendingConnectionOptions?.Options; @@ -148,9 +160,9 @@ public ConnectionManager(Func? RemoteFactory { get; set; } } -public class PowerSyncDBEvent : StreamingSyncImplementationEvent +public class PowerSyncDBEvents : EventManager { - public bool? Initialized { get; set; } - public Schema? SchemaChanged { get; set; } + public interface IPowerSyncDBEvent; - public bool? Closing { get; set; } + public class InitializedEvent : IPowerSyncDBEvent; + public class ClosingEvent : IPowerSyncDBEvent; + public class ClosedEvent : IPowerSyncDBEvent; + public class SchemaChangedEvent(Schema schema) : IPowerSyncDBEvent + { + public Schema Schema { get; set; } = schema; + } + public class StatusChangedEvent(SyncStatus status) : IPowerSyncDBEvent + { + public SyncStatus Status { get; set; } = status; + } + public class StatusUpdatedEvent(SyncStatusOptions status) : IPowerSyncDBEvent + { + public SyncStatusOptions Status { get; set; } = status; + } - public bool? Closed { get; set; } + public EventStream OnInitialized { get; } = new(); + public EventStream OnClosing { get; } = new(); + public EventStream OnClosed { get; } = new(); + public EventStream OnSchemaChanged { get; } = new(); + public EventStream OnStatusChanged { get; } = new(); + public EventStream OnStatusUpdated { get; } = new(); + + public PowerSyncDBEvents() + { + Register(OnInitialized); + Register(OnClosing); + Register(OnClosed); + Register(OnSchemaChanged); + Register(OnStatusChanged); + Register(OnStatusUpdated); + } } -public interface IPowerSyncDatabase : IEventStream +public interface IPowerSyncDatabase : ICloseableAsync { public Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnectionOptions? options = null); public ISyncStream SyncStream(string name, Dictionary? parameters = null); @@ -96,10 +124,9 @@ public interface IPowerSyncDatabase : IEventStream Task WriteTransaction(Func fn, DBLockOptions? options = null); Task WriteTransaction(Func> fn, DBLockOptions? options = null); - } -public class PowerSyncDatabase : EventStream, IPowerSyncDatabase +public class PowerSyncDatabase : IPowerSyncDatabase { public IDBAdapter Database { get; protected set; } private CompiledSchema schema; @@ -107,9 +134,11 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30; private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled); - public new bool Closed { get; protected set; } + public bool Closed { get; protected set; } public bool Ready { get; protected set; } + public PowerSyncDBEvents Events { get; protected set; } = new(); + protected Task IsReadyTask; protected ConnectionManager ConnectionManager; @@ -138,7 +167,6 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba public bool Connected => CurrentStatus.Connected; public bool Connecting => CurrentStatus.Connecting; - public PowerSyncConnectionOptions? ConnectionOptions => ConnectionManager.ConnectionOptions; public PowerSyncDatabase(PowerSyncDatabaseOptions options) @@ -205,19 +233,16 @@ public PowerSyncDatabase(PowerSyncDatabaseOptions options) }); var syncStreamStatusCts = CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); - var syncStreamStatusListener = syncStreamImplementation.ListenAsync(syncStreamStatusCts.Token); + var syncStreamStatusListener = syncStreamImplementation.Events.OnStatusChanged.ListenAsync(syncStreamStatusCts.Token); var _ = Task.Run(async () => { await foreach (var update in syncStreamStatusListener) { - if (update.StatusChanged != null) + CurrentStatus = new SyncStatus(new SyncStatusOptions(update.Status.Options) { - CurrentStatus = new SyncStatus(new SyncStatusOptions(update.StatusChanged.Options) - { - HasSynced = CurrentStatus?.HasSynced == true || update.StatusChanged.LastSyncedAt != null, - }); - Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); - } + HasSynced = CurrentStatus?.HasSynced == true || update.Status.LastSyncedAt != null, + }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } }); @@ -285,7 +310,7 @@ public async Task WaitForStatus(Func predicate, CancellationTo ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token) : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, cancellationToken.Value); - var statusListener = ListenAsync(cts.Token); + var statusListener = Events.OnStatusChanged.ListenAsync(cts.Token); if (predicate(CurrentStatus)) { @@ -301,7 +326,7 @@ public async Task WaitForStatus(Func predicate, CancellationTo { await foreach (var update in statusListener) { - if ((update.StatusChanged != null) && predicate(update.StatusChanged)) + if (predicate(update.Status)) { tcs.TrySetResult(true); cts.Cancel(); @@ -322,7 +347,7 @@ protected async Task Initialize(PowerSyncDatabaseOptions options) await ResolveOfflineSyncStatus(); await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE"); Ready = true; - Emit(new PowerSyncDBEvent { Initialized = true }); + Events.Emit(new PowerSyncDBEvents.InitializedEvent()); } private record VersionResult(string version); @@ -366,7 +391,7 @@ protected async Task ResolveOfflineSyncStatus() if (!updatedStatus.IsEqual(CurrentStatus)) { CurrentStatus = updatedStatus; - Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } } @@ -394,7 +419,7 @@ public async Task UpdateSchema(Schema schema) this.schema = compiledSchema; await Database.Execute("SELECT powersync_replace_schema(?)", [compiledSchema.ToJSON()]); await Database.RefreshSchema(); - Emit(new PowerSyncDBEvent { SchemaChanged = schema }); + Events.Emit(new PowerSyncDBEvents.SchemaChangedEvent(schema)); } /// @@ -458,7 +483,7 @@ await Database.WriteTransaction(async tx => // The data has been deleted - reset the sync status CurrentStatus = new SyncStatus(new SyncStatusOptions()); - Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus }); + Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus)); } /// @@ -479,18 +504,16 @@ public ISyncStream SyncStream(string name, Dictionary? parameter /// Once close is called, this connection cannot be used again - a new one /// must be constructed. /// - public new async Task Close() + public async Task Close() { await WaitForReady(); if (Closed) return; - Emit(new PowerSyncDBEvent { Closing = true }); + Events.Emit(new PowerSyncDBEvents.ClosingEvent()); await Disconnect(); - base.Close(); - masterCts.Cancel(); masterCts.Dispose(); @@ -499,7 +522,10 @@ public ISyncStream SyncStream(string name, Dictionary? parameter await Database.Close(); Closed = true; - Emit(new PowerSyncDBEvent { Closed = true }); + + Events.Emit(new PowerSyncDBEvents.ClosedEvent()); + + Events.Close(); } private record UploadQueueStatsSizeCountResult(long size, long count); @@ -755,7 +781,7 @@ public IAsyncEnumerable OnChange(SQLWatchOptions? options = ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value) : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); - var listener = Database.ListenAsync(signal.Token); + var listener = Database.Events.OnTablesUpdated.ListenAsync(signal.Token); // Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the // connection is established @@ -764,7 +790,7 @@ public IAsyncEnumerable OnChange(SQLWatchOptions? options = private async IAsyncEnumerable OnChangeCore( HashSet watchedTables, - IAsyncEnumerable listener, + IAsyncEnumerable listener, [EnumeratorCancellation] CancellationToken signal, bool triggerImmediately ) @@ -778,7 +804,6 @@ bool triggerImmediately await foreach (var e in listener) { if (signal.IsCancellationRequested) yield break; - if (e.TablesUpdated == null) continue; changedTables.Clear(); GetTablesFromNotification(e.TablesUpdated, changedTables); @@ -828,7 +853,7 @@ public IAsyncEnumerable Watch( // so that table changes between Watch() being called and iteration starting are not missed. // This mirrors the pattern used in OnChange(). var initialRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); - var initialListener = Database.ListenAsync(initialRestartCts.Token); + var initialListener = Database.Events.OnTablesUpdated.ListenAsync(initialRestartCts.Token); return WatchCore(sql, parameters, options, signal, initialRestartCts, initialListener); } @@ -839,7 +864,7 @@ private async IAsyncEnumerable WatchCore( SQLWatchOptions options, CancellationTokenSource signal, CancellationTokenSource initialRestartCts, - IAsyncEnumerable initialListener + IAsyncEnumerable initialListener ) { var schemaChanged = new TaskCompletionSource(); @@ -847,14 +872,11 @@ IAsyncEnumerable initialListener // Listen for schema changes in the background _ = Task.Run(async () => { - await foreach (var update in ListenAsync(signal.Token)) + await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token)) { - if (update.SchemaChanged != null) - { - // Swap schemaChanged with an unresolved TCS - var oldTcs = Interlocked.Exchange(ref schemaChanged, new()); - oldTcs.TrySetResult(true); - } + // Swap schemaChanged with an unresolved TCS + var oldTcs = Interlocked.Exchange(ref schemaChanged, new()); + oldTcs.TrySetResult(true); } }, signal.Token); @@ -904,7 +926,7 @@ IAsyncEnumerable initialListener // Establish a new listener BEFORE resolving source tables in the next iteration, // so that changes during the async GetSourceTables call are not missed. currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); - currentListener = Database.ListenAsync(currentRestartCts.Token); + currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token); break; } @@ -951,7 +973,7 @@ internal async Task> GetSourceTables(string sql, object?[]? para private async IAsyncEnumerable OnRawTableChange( HashSet watchedTables, - IAsyncEnumerable listener, + IAsyncEnumerable listener, [EnumeratorCancellation] CancellationToken token, bool triggerImmediately = false ) @@ -964,19 +986,16 @@ private async IAsyncEnumerable OnRawTableChange( HashSet changedTables = new(); await foreach (var e in listener) { - if (e.TablesUpdated != null) - { - if (token.IsCancellationRequested) break; + if (token.IsCancellationRequested) break; - // Extract the changed tables and intersect with the watched tables - changedTables.Clear(); - GetTablesFromNotification(e.TablesUpdated, changedTables); - changedTables.IntersectWith(watchedTables); + // Extract the changed tables and intersect with the watched tables + changedTables.Clear(); + GetTablesFromNotification(e.TablesUpdated, changedTables); + changedTables.IntersectWith(watchedTables); - if (changedTables.Count == 0) continue; + if (changedTables.Count == 0) continue; - yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; - } + yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; } } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index dd1b105..c073561 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -100,13 +100,23 @@ public static class PSInternalTable public static readonly string UNTYPED = "ps_untyped"; } -public class BucketStorageEvent +public class BucketStorageEvents : EventManager { - public bool CrudUpdate { get; set; } + public interface IBucketStorageEvent; + + public class CrudUpdateEvent : IBucketStorageEvent; + + public EventStream OnCrudUpdate { get; } = new(); + + public BucketStorageEvents() + { + Register(OnCrudUpdate); + } } -public interface IBucketStorageAdapter : IEventStream +public interface IBucketStorageAdapter : ICloseable { + BucketStorageEvents Events { get; } Task Init(); Task NextCrudItem(); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 48e290a..e72e963 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -12,13 +12,13 @@ namespace PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.DB; using PowerSync.Common.DB.Crud; -using PowerSync.Common.Utils; -public class SqliteBucketStorage : EventStream, IBucketStorageAdapter +public class SqliteBucketStorage : IBucketStorageAdapter { - public static readonly string MAX_OP_ID = "9223372036854775807"; + public BucketStorageEvents Events { get; } = new(); + private readonly IDBAdapter db; private bool hasCompletedSync; private readonly HashSet tableNames; @@ -27,29 +27,27 @@ public class SqliteBucketStorage : EventStream, IBucketStora private readonly ILogger logger; private readonly CancellationTokenSource updateCts; + private readonly Task updateTask; private record ExistingTableRowsResult(string name); public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null) { this.db = db; - this.logger = logger ?? NullLogger.Instance; ; + this.logger = logger ?? NullLogger.Instance; hasCompletedSync = false; tableNames = []; updateCts = new CancellationTokenSource(); - var _ = Task.Run(() => + updateTask = Task.Run(() => { - foreach (var update in db.Listen(updateCts.Token)) + foreach (var update in db.Events.OnTablesUpdated.Listen(updateCts.Token)) { - if (update.TablesUpdated != null) + var tables = DBAdapterUtils.ExtractTableUpdates(update.TablesUpdated); + if (tables.Contains(PSInternalTable.CRUD)) { - var tables = DBAdapterUtils.ExtractTableUpdates(update.TablesUpdated); - if (tables.Contains(PSInternalTable.CRUD)) - { - Emit(new BucketStorageEvent { CrudUpdate = true }); - } + Events.Emit(new BucketStorageEvents.CrudUpdateEvent()); } } }); @@ -67,10 +65,11 @@ public async Task Init() } } - public new void Close() + public void Close() { updateCts.Cancel(); - base.Close(); + try { updateTask.Wait(2000); } catch (Exception) { } + Events.Close(); } private record ClientIdResult(string? client_id); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 631543b..b1abb88 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -89,17 +89,34 @@ public class RequiredPowerSyncConnectionOptions : BaseConnectionOptions public new bool IncludeDefaultStreams { get; set; } = default; } -public class StreamingSyncImplementationEvent +public class StreamingSyncImplementationEvents : EventManager { + public interface IStreamingSyncImplementationEvent; + + public class StatusUpdatedEvent(SyncStatus status) : IStreamingSyncImplementationEvent + { + public SyncStatus Status { get; set; } = status; + } + public class StatusChangedEvent(SyncStatus status) : IStreamingSyncImplementationEvent + { + public SyncStatus Status { get; set; } = status; + } + /// - /// Set whenever a status update has been attempted to be made or refreshed. + /// See whenever a status update has been attempted to be made or refreshed. /// - public SyncStatusOptions? StatusUpdated { get; set; } + public EventStream OnStatusUpdated { get; } = new(); /// - /// Set whenever the status' members have changed in value. + /// See whenever the status' members have changed in value. /// - public SyncStatus? StatusChanged { get; set; } + public EventStream OnStatusChanged { get; } = new(); + + public StreamingSyncImplementationEvents() + { + Register(OnStatusUpdated); + Register(OnStatusChanged); + } } public class PowerSyncConnectionOptions( @@ -131,7 +148,7 @@ public class SubscribedStream } -public class StreamingSyncImplementation : EventStream +public class StreamingSyncImplementation : ICloseable { public static RequiredPowerSyncConnectionOptions DEFAULT_STREAM_CONNECTION_OPTIONS = new() { @@ -140,6 +157,8 @@ public class StreamingSyncImplementation : EventStream + crudUploadTask = Task.Run(async () => { await InternalUploadAllCrud(); notifyCompletedUploads?.Invoke(); @@ -227,7 +249,7 @@ public async Task Connect(PowerSyncConnectionOptions? options = null) var cts = new CancellationTokenSource(); // Subscribe to events before starting StreamingSync to not miss the Connected == true event - var listener = ListenAsync(cts.Token); + var listener = Events.OnStatusChanged.ListenAsync(cts.Token); streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options); @@ -235,7 +257,7 @@ public async Task Connect(PowerSyncConnectionOptions? options = null) { await foreach (var status in listener) { - if (status.StatusChanged?.Connected == true) + if (status.Status.Connected == true) { tcs.TrySetResult(true); cts.Cancel(); @@ -274,11 +296,25 @@ public async Task Disconnect() catch (Exception ex) { // The operation might have failed, all we care about is if it has completed - logger.LogWarning("{Message}", ex.Message); + logger.LogWarning("Streaming sync task failed during disconnect: {Message}", ex.Message); } streamingSyncTask = null; CancellationTokenSource = null; + // Do the same for any pending CRUD uploads + if (crudUploadTask != null) + { + try + { + await crudUploadTask; + } + catch (Exception ex) + { + logger.LogWarning("CRUD upload task failed during disconnect: {Message}", ex.Message); + } + crudUploadTask = null; + } + UpdateSyncStatus(new SyncStatusOptions { Connected = false, Connecting = false }); } @@ -291,9 +327,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio } crudUpdateCts = new CancellationTokenSource(); - var _ = Task.Run(() => + crudUpdateTask = Task.Run(async () => { - foreach (var _ in Options.Adapter.Listen(crudUpdateCts.Token)) + await foreach (var _ in Options.Adapter.Events.OnCrudUpdate.ListenAsync(crudUpdateCts.Token)) { TriggerCrudUpload(); } @@ -307,6 +343,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio nestedCts.Cancel(); crudUpdateCts?.Cancel(); crudUpdateCts = null; + try { crudUpdateTask?.Wait(2000); } catch (Exception) { } UpdateSyncStatus(new SyncStatusOptions { Connected = false, @@ -672,11 +709,12 @@ async Task HandleInstruction(Instruction instruction) return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart }; } - public new void Close() + public void Close() { crudUpdateCts?.Cancel(); - base.Close(); crudUpdateCts = null; + try { crudUpdateTask?.Wait(2000); } catch (Exception) { } + Events.Close(); } public record ResponseData( @@ -815,12 +853,20 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio { SyncStatus = updatedStatus; logger.LogDebug("[Sync status changed]: {message}", updatedStatus.ToJSON()); + + // Emit events using new SyncStatus objects to prevent local modifications propagating to StreamingSyncImplementation + // Only trigger this if there was a change - Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); - } + Events.Emit(new StreamingSyncImplementationEvents.StatusChangedEvent(new SyncStatus(updatedStatus.Options))); - // Trigger this for all updates - Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); + // Emit StatusUpdated event wrapping a new SyncStatus object (prevents race conditions) + Events.Emit(new StreamingSyncImplementationEvents.StatusUpdatedEvent(new SyncStatus(updatedStatus.Options))); + } + else + { + // Emit StatusUpdated event directly wrapping `updatedStatus` (not exposed elsewhere) + Events.Emit(new StreamingSyncImplementationEvents.StatusUpdatedEvent(updatedStatus)); + } } catch (Exception ex) { diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index a693075..b1b0cde 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -73,9 +73,7 @@ public class TableUpdateOperation(RowUpdateType OpType, long RowId) public long RowId { get; set; } = RowId; } -public interface INotification -{ -} +public interface INotification; public class UpdateNotification(string table, RowUpdateType OpType, long RowId) : TableUpdateOperation(OpType, RowId), INotification { @@ -89,9 +87,21 @@ public class BatchedUpdateNotification : INotification public Dictionary GroupedUpdates { get; set; } = []; } -public class DBAdapterEvent +public class DBAdapterEvents : EventManager { - public INotification? TablesUpdated { get; set; } + public interface IDBAdapterEvent; + + public class TablesUpdatedEvent(INotification tablesUpdatedNotification) : IDBAdapterEvent + { + public INotification TablesUpdated { get; set; } = tablesUpdatedNotification; + } + + public EventStream OnTablesUpdated { get; } = new(); + + public DBAdapterEvents() + { + Register(OnTablesUpdated); + } } public class DBLockOptions @@ -113,7 +123,7 @@ public static string[] ExtractTableUpdates(INotification update) } } -public interface IDBAdapter : IEventStream, ILockContext +public interface IDBAdapter : ILockContext, ICloseableAsync { /// /// Closes the adapter. @@ -125,6 +135,11 @@ public interface IDBAdapter : IEventStream, ILockContext /// string Name { get; } + /// + /// The event manager for the adapter. + /// + DBAdapterEvents Events { get; } + /// /// Executes a read lock with the given function. /// diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 836c8ea..0b83a2d 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -19,10 +19,12 @@ public class MDSQLiteAdapterOptions() public MDSQLiteOptions? SqliteOptions; } -public class MDSQLiteAdapter : EventStream, IDBAdapter +public class MDSQLiteAdapter : IDBAdapter { public string Name => options.Name; + public DBAdapterEvents Events { get; } = new(); + // One writer private MDSQLiteConnection writeConnection = null!; private readonly AsyncLock writeMutex = new(); @@ -114,7 +116,7 @@ private async Task Init() { if (notification.TablesUpdated != null) { - Emit(notification); + Events.Emit(notification); } } }); @@ -146,13 +148,13 @@ protected virtual void LoadExtension(SqliteConnection db) db.LoadExtension(extensionPath, "sqlite3_powersync_init"); } - public new async Task Close() + public async Task Close() { tablesUpdatedCts?.Cancel(); try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ } - base.Close(); writeConnection?.Close(); await readPool.Close(); + Events.Close(); } public async Task Execute(string query, object?[]? parameters = null) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index d6eb924..7d9e4ad 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -18,9 +18,8 @@ public class MDSQLiteConnectionOptions(SqliteConnection database) public SqliteConnection Database { get; set; } = database; } -public class MDSQLiteConnection : EventStream, ILockContext +public class MDSQLiteConnection : EventStream, ILockContext { - public SqliteConnection Db; private readonly List updateBuffer; public MDSQLiteConnection(MDSQLiteConnectionOptions options) @@ -71,7 +70,7 @@ public void FlushUpdates() }; updateBuffer.Clear(); - Emit(new DBAdapterEvent { TablesUpdated = batchedUpdate }); + Emit(new DBAdapterEvents.TablesUpdatedEvent(batchedUpdate)); } private static List PrepareQueryString(ref string query, int parameterCount) diff --git a/PowerSync/PowerSync.Common/Utils/EventManager.cs b/PowerSync/PowerSync.Common/Utils/EventManager.cs new file mode 100644 index 0000000..1c0ff31 --- /dev/null +++ b/PowerSync/PowerSync.Common/Utils/EventManager.cs @@ -0,0 +1,91 @@ +namespace PowerSync.Common.Utils; + +public interface IEventManager : ICloseable +{ + /// + /// Registers a new EventStream into the EventManager. + /// + void Register(EventStream stream); + + /// + /// Deregisters the stream associated with the EventManager. + /// This does NOT close the stream in the default implementation. + /// + bool Deregister(); + + /// + /// Attempts to retreive the EventStream associated with events of type T. + /// + bool TryGetStream(out EventStream stream); + + /// + /// Posts a message to the stream managing events of type T. + /// + void Emit(T evt); + + /// + /// Attemps to post a message to the stream managing events of type T. + /// + bool TryEmit(T evt); +} + +public class EventManager : IEventManager +{ + private readonly Dictionary _streams = new(); + + public void Register(EventStream stream) + { + _streams[typeof(T)] = stream; + } + + public bool Deregister() + { + return _streams.Remove(typeof(T)); + } + + public bool TryGetStream(out EventStream stream) + { + if (_streams.TryGetValue(typeof(T), out var streamObj)) + { + stream = (EventStream)streamObj; + return true; + } + stream = null!; + return false; + } + + public void Emit(T evt) + { + if (TryGetStream(out var stream)) + { + stream.Emit(evt); + } + else + { + throw new InvalidOperationException($"No stream emits events of type {typeof(T).Name}."); + } + } + + public bool TryEmit(T evt) + { + if (TryGetStream(out var stream)) + { + stream.Emit(evt); + return true; + } + else + { + return false; + } + } + + public void Close() + { + foreach (var kvp in _streams) + { + ((ICloseable)kvp.Value).Close(); + } + _streams.Clear(); + } +} + diff --git a/PowerSync/PowerSync.Common/Utils/EventStream.cs b/PowerSync/PowerSync.Common/Utils/EventStream.cs index 5dc209f..b229e83 100644 --- a/PowerSync/PowerSync.Common/Utils/EventStream.cs +++ b/PowerSync/PowerSync.Common/Utils/EventStream.cs @@ -4,7 +4,7 @@ namespace PowerSync.Common.Utils; using System.Runtime.CompilerServices; using System.Threading.Channels; -public interface IEventStream +public interface IEventStream : ICloseable { void Emit(T item); @@ -13,8 +13,6 @@ public interface IEventStream IEnumerable Listen(CancellationToken cancellationToken); IAsyncEnumerable ListenAsync(CancellationToken cancellationToken); - - void Close(); } public class EventStream : IEventStream diff --git a/PowerSync/PowerSync.Common/Utils/ICloseable.cs b/PowerSync/PowerSync.Common/Utils/ICloseable.cs new file mode 100644 index 0000000..955d7b7 --- /dev/null +++ b/PowerSync/PowerSync.Common/Utils/ICloseable.cs @@ -0,0 +1,12 @@ +namespace PowerSync.Common.Utils; + +public interface ICloseable +{ + public void Close(); +} + +public interface ICloseableAsync +{ + public Task Close(); +} + diff --git a/PowerSync/PowerSync.Maui/CHANGELOG.md b/PowerSync/PowerSync.Maui/CHANGELOG.md index 3ca510c..594783f 100644 --- a/PowerSync/PowerSync.Maui/CHANGELOG.md +++ b/PowerSync/PowerSync.Maui/CHANGELOG.md @@ -1,5 +1,9 @@ # PowerSync.Maui Changelog +## 0.0.9-alpha.1 + +- Upstream PowerSync.Common version bump (See Powersync.Common changelog 0.0.11-alpha.1 for more information) + ## 0.0.8-alpha.1 - Upstream PowerSync.Common version bump (See Powersync.Common changelog 0.0.10-alpha.1 for more information) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs index 286ea4a..aedfa4e 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs @@ -1,14 +1,8 @@ namespace PowerSync.Common.Tests.Client.Sync; -using System.Runtime.CompilerServices; - using Common.Client.Sync.Stream; -using Newtonsoft.Json; - using PowerSync.Common.Client; -using PowerSync.Common.DB.Crud; -using PowerSync.Common.DB.Schema; using PowerSync.Common.Tests.Utils; using PowerSync.Common.Tests.Utils.Sync; @@ -209,6 +203,4 @@ public async Task UnsubscribeAllTest() await db.Connect(new TestConnector(), new PowerSyncConnectionOptions()); TestUtils.DeepEquivalent(new RequestStream { IncludeDefaults = true, Subscriptions = [] }, syncService.Requests[0].Streams); } - - } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs index a924a23..6faacc8 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/EventStreamTests.cs @@ -5,7 +5,6 @@ namespace PowerSync.Common.Tests; public class EventStreamTests { - [Fact] public async Task EventStream_ShouldReceiveTwoMessages_Async() { @@ -109,4 +108,136 @@ public async Task EventStream_ShouldReceiveTwoMessages_Sync() Assert.Contains(status2, receivedMessages); Assert.Equal(0, eventStream.SubscriberCount()); } + + [Fact] + public async Task EventManager_RegistersStreamsCorrectly() + { + var manager = new EventManager(); + var stream1 = new EventStream(); + var stream2 = new EventStream(); + var stream3 = new EventStream(); // Control + + manager.Register(stream1); + manager.Register(stream2); + + Assert.True(manager.TryGetStream(out var obtainedStream1)); + Assert.True(manager.TryGetStream(out var obtainedStream2)); + Assert.False(manager.TryGetStream(out var obtainedStream3)); + + Assert.Equal(stream1, obtainedStream1); + Assert.Equal(stream2, obtainedStream2); + Assert.Equal(null, obtainedStream3); + + manager.Close(); + } + + [Fact] + public async Task EventManager_CloseRemovesAndClosesStreams() + { + var manager = new EventManager(); + var stream1 = new EventStream(); + var stream2 = new EventStream(); + var stream3 = new EventStream(); // Control + + manager.Register(stream1); + manager.Register(stream2); + + manager.Close(); + + Assert.False(manager.TryGetStream(out var obtainedStream1)); + Assert.False(manager.TryGetStream(out var obtainedStream2)); + Assert.False(manager.TryGetStream(out var obtainedStream3)); + + Assert.Equal(null, obtainedStream1); + Assert.Equal(null, obtainedStream2); + Assert.Equal(null, obtainedStream3); + + Assert.True(stream1.Closed); + Assert.True(stream2.Closed); + Assert.False(stream3.Closed); + } + + [Fact(Timeout = 2000)] + public async Task EventManager_ShouldReceiveEmittedEvents() + { + var manager = new EventManager(); + var stream1 = new EventStream(); + var stream2 = new EventStream(); + + manager.Register(stream1); + manager.Register(stream2); + + var cts1 = new CancellationTokenSource(); + var listener1 = stream1.ListenAsync(cts1.Token); + Assert.True(manager.TryEmit(false)); + Assert.True(manager.TryEmit(false)); + Assert.True(manager.TryEmit(true)); + int eventCount = 0; + + await foreach (var evt in listener1) + { + eventCount++; + if (evt == true) + { + cts1.Cancel(); + } + } + + Assert.Equal(3, eventCount); + + var cts2 = new CancellationTokenSource(); + var listener2 = stream2.ListenAsync(cts2.Token); + Assert.True(manager.TryEmit("hi")); + Assert.True(manager.TryEmit("hello")); + Assert.True(manager.TryEmit("sup")); + Assert.True(manager.TryEmit("STOP")); + eventCount = 0; + + await foreach (var evt in listener2) + { + eventCount++; + if (evt == "STOP") + { + cts2.Cancel(); + } + } + + Assert.Equal(4, eventCount); + + manager.Close(); + } + + [Fact] + public async Task EventManager_ShouldNotReceiveEventsAfterDeregistering() + { + var manager = new EventManager(); + var stream = new EventStream(); + + manager.Register(stream); + + var cts = new CancellationTokenSource(); + var listener = stream.ListenAsync(cts.Token); + var sem = new SemaphoreSlim(0); + int eventCount = 0; + + _ = Task.Run(async () => + { + sem.Release(); + await foreach (var evt in listener) + { + eventCount++; + sem.Release(); + } + }, cts.Token); + Assert.True(await sem.WaitAsync(100)); + + Assert.True(manager.Deregister()); + + Assert.False(manager.TryEmit("invalid")); + Assert.False(await sem.WaitAsync(100)); + + // Cleanup + cts.Cancel(); + manager.Close(); + } } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs index e9d46c5..5482556 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs @@ -65,13 +65,10 @@ public static async Task NextStatus(PowerSyncDatabase db) _ = Task.Run(async () => { - await foreach (var update in db.ListenAsync(cts.Token)) + await foreach (var update in db.Events.OnStatusChanged.ListenAsync(cts.Token)) { - if (update.StatusChanged != null) - { - tcs.TrySetResult(update.StatusChanged); - cts?.Cancel(); - } + tcs.TrySetResult(update.Status); + cts?.Cancel(); } }); diff --git a/demos/CommandLine/Demo.cs b/demos/CommandLine/Demo.cs index 623581f..3407733 100644 --- a/demos/CommandLine/Demo.cs +++ b/demos/CommandLine/Demo.cs @@ -121,12 +121,9 @@ static async Task Main() _ = Task.Run(async () => { - await foreach (var update in db.ListenAsync(new CancellationToken())) + await foreach (var update in db.Events.OnStatusChanged.ListenAsync(new CancellationToken())) { - if (update.StatusChanged != null) - { - connected = update.StatusChanged.Connected; - } + connected = update.Status.Connected; } }); diff --git a/demos/MAUITodo/Views/ListsPage.xaml.cs b/demos/MAUITodo/Views/ListsPage.xaml.cs index 8f0be46..0211905 100644 --- a/demos/MAUITodo/Views/ListsPage.xaml.cs +++ b/demos/MAUITodo/Views/ListsPage.xaml.cs @@ -22,15 +22,12 @@ protected override async void OnAppearing() _ = Task.Run(async () => { - await foreach (var update in database.Db.ListenAsync(new CancellationToken())) + await foreach (var update in database.Db.Events.OnStatusChanged.ListenAsync(new CancellationToken())) { - if (update.StatusChanged != null) + MainThread.BeginInvokeOnMainThread(() => { - MainThread.BeginInvokeOnMainThread(() => - { - WifiStatusItem.IconImageSource = update.StatusChanged.Connected ? "wifi.png" : "wifi_off.png"; - }); - } + WifiStatusItem.IconImageSource = update.Status.Connected ? "wifi.png" : "wifi_off.png"; + }); } });