Skip to content

Commit f2e5ffb

Browse files
committed
Add EventManager
1 parent 789629d commit f2e5ffb

3 files changed

Lines changed: 143 additions & 35 deletions

File tree

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 97 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,87 @@ public class PowerSyncDatabaseOptions() : BasePowerSyncDatabaseOptions()
5353
public Func<IPowerSyncBackendConnector, Remote>? RemoteFactory { get; set; }
5454
}
5555

56-
public class PowerSyncDBEvent : StreamingSyncImplementationEvent
56+
public class PowerSyncDBEvents : EventManager<PowerSyncDBEvents.IPowerSyncDBEvent>
5757
{
58-
public bool? Initialized { get; set; }
59-
public Schema? SchemaChanged { get; set; }
58+
public interface IPowerSyncDBEvent;
6059

61-
public bool? Closing { get; set; }
60+
public class InitializedEvent : IPowerSyncDBEvent;
61+
public class ClosingEvent : IPowerSyncDBEvent;
62+
public class ClosedEvent : IPowerSyncDBEvent;
63+
public class SchemaChangedEvent(Schema schema) : IPowerSyncDBEvent
64+
{
65+
public Schema Schema { get; set; } = schema;
66+
}
67+
public class StatusChangedEvent(SyncStatus status) : IPowerSyncDBEvent
68+
{
69+
public SyncStatus Status { get; set; } = status;
70+
}
71+
public class StatusUpdatedEvent(SyncStatusOptions status) : IPowerSyncDBEvent
72+
{
73+
public SyncStatusOptions Status { get; set; } = status;
74+
}
75+
76+
public EventStream<InitializedEvent> OnInitialized { get; } = new();
77+
public EventStream<ClosingEvent> OnClosing { get; } = new();
78+
public EventStream<ClosedEvent> OnClosed { get; } = new();
79+
public EventStream<SchemaChangedEvent> OnSchemaChanged { get; } = new();
80+
public EventStream<StatusChangedEvent> OnStatusChanged { get; } = new();
81+
public EventStream<StatusUpdatedEvent> OnStatusUpdated { get; } = new();
82+
83+
public override bool TryGetStream<T>(out EventStream<T> stream)
84+
{
85+
if (typeof(T) == typeof(PowerSyncDBEvents.InitializedEvent))
86+
{
87+
stream = (EventStream<T>)(object)OnInitialized;
88+
return true;
89+
}
90+
91+
if (typeof(T) == typeof(PowerSyncDBEvents.ClosingEvent))
92+
{
93+
stream = (EventStream<T>)(object)OnClosing;
94+
return true;
95+
}
96+
97+
if (typeof(T) == typeof(PowerSyncDBEvents.ClosedEvent))
98+
{
99+
stream = (EventStream<T>)(object)OnClosed;
100+
return true;
101+
}
102+
103+
if (typeof(T) == typeof(PowerSyncDBEvents.SchemaChangedEvent))
104+
{
105+
stream = (EventStream<T>)(object)OnSchemaChanged;
106+
return true;
107+
}
108+
109+
if (typeof(T) == typeof(PowerSyncDBEvents.StatusChangedEvent))
110+
{
111+
stream = (EventStream<T>)(object)OnStatusChanged;
112+
return true;
113+
}
114+
115+
if (typeof(T) == typeof(PowerSyncDBEvents.StatusUpdatedEvent))
116+
{
117+
stream = (EventStream<T>)(object)OnStatusUpdated;
118+
return true;
119+
}
62120

63-
public bool? Closed { get; set; }
121+
stream = null!;
122+
return false;
123+
}
124+
125+
public override void Close()
126+
{
127+
OnInitialized.Close();
128+
OnClosing.Close();
129+
OnClosed.Close();
130+
OnSchemaChanged.Close();
131+
OnStatusChanged.Close();
132+
OnStatusUpdated.Close();
133+
}
64134
}
65135

66-
public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
136+
public interface IPowerSyncDatabase
67137
{
68138
public Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnectionOptions? options = null);
69139
public ISyncStream SyncStream(string name, Dictionary<string, object>? parameters = null);
@@ -96,20 +166,21 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
96166

97167
Task WriteTransaction(Func<ITransaction, Task> fn, DBLockOptions? options = null);
98168
Task<T> WriteTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null);
99-
100169
}
101170

102-
public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDatabase
171+
public class PowerSyncDatabase : IPowerSyncDatabase
103172
{
104173
public IDBAdapter Database { get; protected set; }
105174
private CompiledSchema schema;
106175

107176
private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30;
108177
private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled);
109178

110-
public new bool Closed { get; protected set; }
179+
public bool Closed { get; protected set; }
111180
public bool Ready { get; protected set; }
112181

182+
public PowerSyncDBEvents Events { get; protected set; } = new();
183+
113184
protected Task IsReadyTask;
114185
protected ConnectionManager ConnectionManager;
115186

@@ -138,7 +209,6 @@ public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDataba
138209
public bool Connected => CurrentStatus.Connected;
139210
public bool Connecting => CurrentStatus.Connecting;
140211

141-
142212
public PowerSyncConnectionOptions? ConnectionOptions => ConnectionManager.ConnectionOptions;
143213

144214
public PowerSyncDatabase(PowerSyncDatabaseOptions options)
@@ -216,7 +286,7 @@ public PowerSyncDatabase(PowerSyncDatabaseOptions options)
216286
{
217287
HasSynced = CurrentStatus?.HasSynced == true || update.StatusChanged.LastSyncedAt != null,
218288
});
219-
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
289+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
220290
}
221291
}
222292
});
@@ -285,7 +355,7 @@ public async Task WaitForStatus(Func<SyncStatus, bool> predicate, CancellationTo
285355
? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token)
286356
: CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, cancellationToken.Value);
287357

288-
var statusListener = ListenAsync(cts.Token);
358+
var statusListener = Events.OnStatusChanged.ListenAsync(cts.Token);
289359

290360
if (predicate(CurrentStatus))
291361
{
@@ -301,7 +371,7 @@ public async Task WaitForStatus(Func<SyncStatus, bool> predicate, CancellationTo
301371
{
302372
await foreach (var update in statusListener)
303373
{
304-
if ((update.StatusChanged != null) && predicate(update.StatusChanged))
374+
if (predicate(update.Status))
305375
{
306376
tcs.TrySetResult(true);
307377
cts.Cancel();
@@ -322,7 +392,7 @@ protected async Task Initialize(PowerSyncDatabaseOptions options)
322392
await ResolveOfflineSyncStatus();
323393
await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE");
324394
Ready = true;
325-
Emit(new PowerSyncDBEvent { Initialized = true });
395+
Events.Emit(new PowerSyncDBEvents.InitializedEvent());
326396
}
327397

328398
private record VersionResult(string version);
@@ -366,7 +436,7 @@ protected async Task ResolveOfflineSyncStatus()
366436
if (!updatedStatus.IsEqual(CurrentStatus))
367437
{
368438
CurrentStatus = updatedStatus;
369-
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
439+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
370440
}
371441
}
372442

@@ -394,7 +464,7 @@ public async Task UpdateSchema(Schema schema)
394464
this.schema = compiledSchema;
395465
await Database.Execute("SELECT powersync_replace_schema(?)", [compiledSchema.ToJSON()]);
396466
await Database.RefreshSchema();
397-
Emit(new PowerSyncDBEvent { SchemaChanged = schema });
467+
Events.Emit(new PowerSyncDBEvents.SchemaChangedEvent(schema));
398468
}
399469

400470
/// <summary>
@@ -458,7 +528,7 @@ await Database.WriteTransaction(async tx =>
458528

459529
// The data has been deleted - reset the sync status
460530
CurrentStatus = new SyncStatus(new SyncStatusOptions());
461-
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
531+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
462532
}
463533

464534
/// <summary>
@@ -479,18 +549,16 @@ public ISyncStream SyncStream(string name, Dictionary<string, object>? parameter
479549
/// Once close is called, this connection cannot be used again - a new one
480550
/// must be constructed.
481551
/// </summary>
482-
public new async Task Close()
552+
public async Task Close()
483553
{
484554
await WaitForReady();
485555

486556
if (Closed) return;
487557

488-
Emit(new PowerSyncDBEvent { Closing = true });
558+
Events.Emit(new PowerSyncDBEvents.ClosingEvent());
489559

490560
await Disconnect();
491561

492-
base.Close();
493-
494562
masterCts.Cancel();
495563
masterCts.Dispose();
496564

@@ -499,7 +567,10 @@ public ISyncStream SyncStream(string name, Dictionary<string, object>? parameter
499567

500568
Database.Close();
501569
Closed = true;
502-
Emit(new PowerSyncDBEvent { Closed = true });
570+
571+
Events.Emit(new PowerSyncDBEvents.ClosedEvent());
572+
573+
Events.Close();
503574
}
504575

505576
private record UploadQueueStatsSizeCountResult(long size, long count);
@@ -847,14 +918,11 @@ IAsyncEnumerable<DBAdapterEvent> initialListener
847918
// Listen for schema changes in the background
848919
_ = Task.Run(async () =>
849920
{
850-
await foreach (var update in ListenAsync(signal.Token))
921+
await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token))
851922
{
852-
if (update.SchemaChanged != null)
853-
{
854-
// Swap schemaChanged with an unresolved TCS
855-
var oldTcs = Interlocked.Exchange(ref schemaChanged, new());
856-
oldTcs.TrySetResult(true);
857-
}
923+
// Swap schemaChanged with an unresolved TCS
924+
var oldTcs = Interlocked.Exchange(ref schemaChanged, new());
925+
oldTcs.TrySetResult(true);
858926
}
859927
}, signal.Token);
860928

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
namespace PowerSync.Common.Utils;
2+
3+
public interface IEventManager<TEvent> where TEvent : class
4+
{
5+
/// <summary>
6+
/// Attempts to retreive the EventStream associated with events of type T.
7+
/// </summary>
8+
bool TryGetStream<T>(out EventStream<T> stream)
9+
where T : class, TEvent;
10+
11+
/// <summary>
12+
/// Posts a message to the stream managing events of type T.
13+
/// </summary>
14+
void Emit<T>(T evt)
15+
where T : class, TEvent;
16+
17+
/// <summary>
18+
/// Close all EventStream objects and disable the IEventManager.
19+
/// </summary>
20+
void Close();
21+
}
22+
23+
public abstract class EventManager<TEvent> : IEventManager<TEvent>
24+
where TEvent : class
25+
{
26+
public abstract bool TryGetStream<T>(out EventStream<T> stream)
27+
where T : class, TEvent;
28+
29+
public void Emit<T>(T evt) where T : class, TEvent
30+
{
31+
if (TryGetStream<T>(out var stream))
32+
{
33+
stream.Emit(evt);
34+
}
35+
else
36+
{
37+
throw new InvalidOperationException($"No stream emits events of type {nameof(T)}.");
38+
}
39+
}
40+
41+
public abstract void Close();
42+
}
43+

Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,10 @@ public static async Task<SyncStatus> NextStatus(PowerSyncDatabase db)
6565

6666
_ = Task.Run(async () =>
6767
{
68-
await foreach (var update in db.ListenAsync(cts.Token))
68+
await foreach (var update in db.Events.OnStatusChanged.ListenAsync(cts.Token))
6969
{
70-
if (update.StatusChanged != null)
71-
{
72-
tcs.TrySetResult(update.StatusChanged);
73-
cts?.Cancel();
74-
}
70+
tcs.TrySetResult(update.Status);
71+
cts?.Cancel();
7572
}
7673
});
7774

0 commit comments

Comments
 (0)