diff --git a/docs/AsyncTimeouts.md b/docs/AsyncTimeouts.md index 5ba4fd3f1..04892d59a 100644 --- a/docs/AsyncTimeouts.md +++ b/docs/AsyncTimeouts.md @@ -62,4 +62,18 @@ using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); // or mu cts.CancelAfter(timeout); await database.StringSetAsync("key", "value").WaitAsync(cts.Token); var value = await database.StringGetAsync("key").WaitAsync(cts.Token); -`````` \ No newline at end of file +``` + +### Cancelling keys enumeration + +Keys being enumerated (via `SCAN`) can *also* be cancelled, using the inbuilt `.WithCancellation(...)` method: + +```csharp +CancellationToken token = ...; // for example, from HttpContext.RequestAborted +await foreach (var key in server.KeysAsync(pattern: "*foo*").WithCancellation(token)) +{ + ... +} +``` + +To use a timeout instead, you can use the `CancellationTokenSource` approach shown above. \ No newline at end of file diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index b301cca8b..22dd36256 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,7 +8,9 @@ Current package versions: ## Unreleased -- (none) +- Support async cancellation of `SCAN` enumeration ([#2911 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2911)) +- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842)) +- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912)) ## 2.8.47 @@ -16,8 +18,6 @@ Current package versions: - Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906)) - Docs: added [guidance on async timeouts](https://stackexchange.github.io/StackExchange.Redis/AsyncTimeouts) ([#2910 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2910)) - Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909)) -- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842)) -- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912)) ## 2.8.41 diff --git a/src/StackExchange.Redis/CursorEnumerable.cs b/src/StackExchange.Redis/CursorEnumerable.cs index 55d93d6a6..e526eceaa 100644 --- a/src/StackExchange.Redis/CursorEnumerable.cs +++ b/src/StackExchange.Redis/CursorEnumerable.cs @@ -141,6 +141,7 @@ private bool SimpleNext() { if (_pageOffset + 1 < _pageCount) { + cancellationToken.ThrowIfCancellationRequested(); _pageOffset++; return true; } @@ -274,7 +275,7 @@ private async ValueTask AwaitedNextAsync(bool isInitial) ScanResult scanResult; try { - scanResult = await pending.ForAwait(); + scanResult = await pending.WaitAsync(cancellationToken).ForAwait(); } catch (Exception ex) { diff --git a/src/StackExchange.Redis/TaskExtensions.cs b/src/StackExchange.Redis/TaskExtensions.cs index ad4b41113..a0994a0b6 100644 --- a/src/StackExchange.Redis/TaskExtensions.cs +++ b/src/StackExchange.Redis/TaskExtensions.cs @@ -25,6 +25,44 @@ internal static Task ObserveErrors(this Task task) return task; } +#if !NET6_0_OR_GREATER + // suboptimal polyfill version of the .NET 6+ API, but reasonable for light use + internal static Task WaitAsync(this Task task, CancellationToken cancellationToken) + { + if (task.IsCompleted || !cancellationToken.CanBeCanceled) return task; + return Wrap(task, cancellationToken); + + static async Task Wrap(Task task, CancellationToken cancellationToken) + { + var tcs = new TaskSourceWithToken(cancellationToken); + using var reg = cancellationToken.Register( + static state => ((TaskSourceWithToken)state!).Cancel(), tcs); + _ = task.ContinueWith( + static (t, state) => + { + var tcs = (TaskSourceWithToken)state!; + if (t.IsCanceled) tcs.TrySetCanceled(); + else if (t.IsFaulted) tcs.TrySetException(t.Exception!); + else tcs.TrySetResult(t.Result); + }, + tcs); + return await tcs.Task; + } + } + + // the point of this type is to combine TCS and CT so that we can use a static + // registration via Register + private sealed class TaskSourceWithToken : TaskCompletionSource + { + public TaskSourceWithToken(CancellationToken cancellationToken) + => _cancellationToken = cancellationToken; + + private readonly CancellationToken _cancellationToken; + + public void Cancel() => TrySetCanceled(_cancellationToken); + } +#endif + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/tests/StackExchange.Redis.Tests/CancellationTests.cs b/tests/StackExchange.Redis.Tests/CancellationTests.cs index 71306e9dd..f9f955eb8 100644 --- a/tests/StackExchange.Redis.Tests/CancellationTests.cs +++ b/tests/StackExchange.Redis.Tests/CancellationTests.cs @@ -40,6 +40,11 @@ public async Task WithCancellation_ValidToken_OperationSucceeds() private static void Pause(IDatabase db) => db.Execute("client", ["pause", ConnectionPauseMilliseconds], CommandFlags.FireAndForget); + private void Pause(IServer server) + { + server.Execute("client", new object[] { "pause", ConnectionPauseMilliseconds }, CommandFlags.FireAndForget); + } + [Fact] public async Task WithTimeout_ShortTimeout_Async_ThrowsOperationCanceledException() { @@ -147,4 +152,38 @@ public async Task CancellationDuringOperation_Async_CancelsGracefully(CancelStra Assert.Equal(cts.Token, oce.CancellationToken); } } + + [Fact] + public async Task ScanCancellable() + { + using var conn = Create(); + var db = conn.GetDatabase(); + var server = conn.GetServer(conn.GetEndPoints()[0]); + + using var cts = new CancellationTokenSource(); + + var watch = Stopwatch.StartNew(); + Pause(server); + try + { + db.StringSet(Me(), "value", TimeSpan.FromMinutes(5), flags: CommandFlags.FireAndForget); + await using var iter = server.KeysAsync(pageSize: 1000).WithCancellation(cts.Token).GetAsyncEnumerator(); + var pending = iter.MoveNextAsync(); + Assert.False(cts.Token.IsCancellationRequested); + cts.CancelAfter(ShortDelayMilliseconds); // start this *after* we've got past the initial check + while (await pending) + { + pending = iter.MoveNextAsync(); + } + Assert.Fail($"{ExpectedCancel}: {watch.ElapsedMilliseconds}ms"); + } + catch (OperationCanceledException oce) + { + var taken = watch.ElapsedMilliseconds; + // Expected if cancellation happens during operation + Log($"Cancelled after {taken}ms"); + Assert.True(taken < ConnectionPauseMilliseconds / 2, "Should have cancelled much sooner"); + Assert.Equal(cts.Token, oce.CancellationToken); + } + } }