Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.0.11-alpha.1

- Implemented throttling for `Watch<T>` and `OnChange` (default 30ms).
- `StatusUpdated` and `StatusChanged` now both emit `SyncStatus` objects instead of just `StatusChanged`.
- Converted most instances of a class inheriting from `EventStream<T>` into a class with an `EventManager` property called `Events`. This allows for subscribing to individual events instead of subscribing to all events and then filtering events manually.

Expand Down
99 changes: 83 additions & 16 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ namespace PowerSync.Common.Client;

using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading.Channels;
using System.Threading.Tasks;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

using Newtonsoft.Json;

using Nito.AsyncEx;
using ThrottleDebounce;

using PowerSync.Common.Client.Connection;
using PowerSync.Common.Client.Sync.Bucket;
Expand Down Expand Up @@ -131,7 +132,7 @@ public class PowerSyncDatabase : IPowerSyncDatabase
public IDBAdapter Database { get; protected set; }
private CompiledSchema schema;

private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30;
private const int DEFAULT_WATCH_THROTTLE_MS = 30;
private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled);

public bool Closed { get; protected set; }
Expand Down Expand Up @@ -785,19 +786,21 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =

// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
// connection is established
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true);
var throttleMs = options?.ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS;
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true, throttleMs);
}

private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
HashSet<string> watchedTables,
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
CancellationTokenSource signal,
bool triggerImmediately
bool triggerImmediately,
int throttleMs = DEFAULT_WATCH_THROTTLE_MS
)
{
try
{
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately))
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately, throttleMs))
{
// Convert from 'ps_data__<name>' to '<name>'
for (int i = 0; i < update.ChangedTables.Length; i++)
Expand Down Expand Up @@ -875,6 +878,7 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
bool isRestart = false;
var currentRestartCts = initialRestartCts;
var currentListener = initialListener;
var throttleMs = options?.ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS;

try
{
Expand All @@ -898,7 +902,8 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
powersyncTables,
currentListener,
currentRestartCts.Token,
isRestart || (options?.TriggerImmediately == true)
isRestart || (options?.TriggerImmediately == true),
throttleMs
).GetAsyncEnumerator();

// Continually wait for either OnChange or SchemaChanged to fire
Expand Down Expand Up @@ -985,31 +990,93 @@ internal async Task<HashSet<string>> GetSourceTables(string sql, object?[]? para
private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
HashSet<string> watchedTables,
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
[EnumeratorCancellation] CancellationToken token,
bool triggerImmediately = false
[EnumeratorCancellation] CancellationToken signal,
bool triggerImmediately = false,
int throttleMs = DEFAULT_WATCH_THROTTLE_MS
)
{
if (triggerImmediately)
{
yield return new WatchOnChangeEvent { ChangedTables = [] };
}

HashSet<string> changedTables = new();
await foreach (var e in listener)
if (throttleMs <= 0)
{
// Extract the changed tables and intersect with the watched tables
changedTables.Clear();
GetTablesFromNotification(e.TablesUpdated, changedTables);
changedTables.IntersectWith(watchedTables);
// No throttling
HashSet<string> changedTables = new();
await foreach (var e in listener)
{
GetTablesFromNotification(e.TablesUpdated, changedTables);
changedTables.IntersectWith(watchedTables);
if (changedTables.Count == 0) continue;
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
}
yield break;
}

if (changedTables.Count == 0) continue;
// Throttled - publish via throttled call to an action that flushes accumulated changes into this channel
var channel = Channel.CreateUnbounded<WatchOnChangeEvent>();
var accumulatedTables = new HashSet<string>();

_ = Task.Run(async () =>
{
using var throttledFlush = Throttler.Throttle(() =>
{
// Safe to lock directly on accumulatedTables because it's a local variable
lock (accumulatedTables)
{
if (accumulatedTables.Count == 0) return;
channel.Writer.TryWrite(new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] });
accumulatedTables.Clear();
}
},
TimeSpan.FromMilliseconds(throttleMs),
leading: false,
trailing: true
);

try
{
var changedTables = new HashSet<string>();
await foreach (var e in listener)
{
GetTablesFromNotification(e.TablesUpdated, changedTables);
changedTables.IntersectWith(watchedTables);
if (changedTables.Count == 0) continue;

yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
lock (accumulatedTables) { accumulatedTables.UnionWith(changedTables); }
throttledFlush.Invoke();
}
}
catch (OperationCanceledException) { }
finally
{
// Flush any remaining events and close the channel
lock (accumulatedTables)
{
if (accumulatedTables.Count > 0)
{
channel.Writer.TryWrite(new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] });
accumulatedTables.Clear();
}
}
channel.Writer.Complete();
}
});

// Continuously pull values from channel and publish to the consumer
while (await channel.Reader.WaitToReadAsync(CancellationToken.None))
{
while (channel.Reader.TryRead(out var evt))
{
yield return evt;
}
}
}

private static void GetTablesFromNotification(INotification updateNotification, HashSet<string> changedTables)
{
changedTables.Clear();
string[] tables = [];
if (updateNotification is BatchedUpdateNotification batchedUpdate)
{
Expand Down
1 change: 1 addition & 0 deletions PowerSync/PowerSync.Common/PowerSync.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
<PackageReference Include="ThrottleDebounce" Version="2.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading