Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 80 additions & 71 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -785,40 +785,31 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =

// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
// connection is established
return OnChangeCore(powersyncTables, listener, signal.Token, options?.TriggerImmediately == true);
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true);
}

private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
HashSet<string> watchedTables,
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
[EnumeratorCancellation] CancellationToken signal,
CancellationTokenSource signal,
bool triggerImmediately
)
{
if (triggerImmediately == true)
{
yield return new WatchOnChangeEvent { ChangedTables = [] };
}

HashSet<string> changedTables = new();
await foreach (var e in listener)
try
{
if (signal.IsCancellationRequested) yield break;

changedTables.Clear();
GetTablesFromNotification(e.TablesUpdated, changedTables);
changedTables.IntersectWith(watchedTables);

if (changedTables.Count == 0) continue;

var update = new WatchOnChangeEvent { ChangedTables = [.. changedTables] };

// Convert from 'ps_data__<name>' to '<name>'
for (int i = 0; i < update.ChangedTables.Length; i++)
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately))
{
update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]);
// Convert from 'ps_data__<name>' to '<name>'
for (int i = 0; i < update.ChangedTables.Length; i++)
{
update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]);
}
yield return update;
}
yield return update;
}
finally
{
signal.Dispose();
}
}

Expand Down Expand Up @@ -870,7 +861,7 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
var schemaChanged = new TaskCompletionSource<bool>();

// Listen for schema changes in the background
_ = Task.Run(async () =>
var schemaListenerTask = Task.Run(async () =>
{
await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token))
{
Expand All @@ -885,60 +876,80 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
var currentRestartCts = initialRestartCts;
var currentListener = initialListener;

while (!signal.Token.IsCancellationRequested)
try
{
// Resolve tables
HashSet<string> powersyncTables;
if (options?.Tables != null)
{
powersyncTables = [.. options
.Tables
.SelectMany<string, string>(table => [$"ps_data__{table}", $"ps_data_local__{table}"]
)];
}
else
{
powersyncTables = await GetSourceTables(sql, parameters);
}

var enumerator = OnRawTableChange(
powersyncTables,
currentListener,
currentRestartCts.Token,
isRestart || (options?.TriggerImmediately == true)
).GetAsyncEnumerator(currentRestartCts.Token);

// Continually wait for either OnChange or SchemaChanged to fire
while (true)
while (!signal.Token.IsCancellationRequested)
{
var currentSchemaTask = schemaChanged.Task;
var onChangeTask = enumerator.MoveNextAsync().AsTask();
var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask);

if (completedTask == currentSchemaTask)
// Resolve tables
HashSet<string> powersyncTables;
if (options?.Tables != null)
{
powersyncTables = [.. options
.Tables
.SelectMany<string, string>(table => [$"ps_data__{table}", $"ps_data_local__{table}"]
)];
}
else
{
currentRestartCts.Cancel();
isRestart = true;
// Let the current task complete/cancel gracefully
try { await onChangeTask; }
catch (OperationCanceledException) { }

// Establish a new listener BEFORE resolving source tables in the next iteration,
// so that changes during the async GetSourceTables call are not missed.
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token);

break;
powersyncTables = await GetSourceTables(sql, parameters);
}

var update = enumerator.Current;
if (update.ChangedTables != null)
var enumerator = OnRawTableChange(
powersyncTables,
currentListener,
currentRestartCts.Token,
isRestart || (options?.TriggerImmediately == true)
).GetAsyncEnumerator();

// Continually wait for either OnChange or SchemaChanged to fire
while (true)
{
if (signal.IsCancellationRequested) yield break;
yield return await GetAll<T>(sql, parameters);
var currentSchemaTask = schemaChanged.Task;
var onChangeTask = enumerator.MoveNextAsync().AsTask();
var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask);

if (completedTask == currentSchemaTask)
{
var oldRestartCts = currentRestartCts;
oldRestartCts.Cancel();
isRestart = true;
// Let the current task complete/cancel gracefully
try { await onChangeTask; }
catch (OperationCanceledException) { }

// Establish a new listener BEFORE resolving source tables in the next iteration,
// so that changes during the async GetSourceTables call are not missed.
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token);
oldRestartCts.Dispose();

break;
}

// Await onChangeTask to propagate cancellation and detect end-of-enumeration
bool hasNext;
try { hasNext = await onChangeTask; }
catch (OperationCanceledException) { yield break; }

if (!hasNext) break;

var update = enumerator.Current;
if (update.ChangedTables != null)
{
yield return await GetAll<T>(sql, parameters);
}
}
}
}
finally
{
signal.Cancel();
try { await schemaListenerTask; }
catch (OperationCanceledException) { }

currentRestartCts.Dispose();
signal.Dispose();
}
}

private class ExplainedResult
Expand Down Expand Up @@ -986,8 +997,6 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
HashSet<string> changedTables = new();
await foreach (var e in listener)
{
if (token.IsCancellationRequested) break;

// Extract the changed tables and intersect with the watched tables
changedTables.Clear();
GetTablesFromNotification(e.TablesUpdated, changedTables);
Expand Down
Loading