From 308455220a982e3e9e595d0a7901646b963f4a38 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 3 Mar 2026 08:59:40 +0000 Subject: [PATCH] wip --- src/StackExchange.Redis/PhysicalBridge.cs | 4 +- .../PhysicalConnection.Read.cs | 17 +++- src/StackExchange.Redis/PhysicalConnection.cs | 78 ++++++++++--------- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index 7fc864d7c..5dee6fbc8 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -1325,7 +1325,7 @@ internal ValueTask WriteMessageTakingWriteLockAsync(PhysicalConnect var result = WriteMessageInsideLock(physical, message); if (result == WriteResult.Success) { - var flush = physical.FlushAsync(false); + var flush = physical.FlushAsync(false, physical.OutputCancel); if (!flush.IsCompletedSuccessfully) { releaseLock = false; // so we don't release prematurely @@ -1394,7 +1394,7 @@ private async ValueTask WriteMessageTakingWriteLockAsync_Awaited( if (result == WriteResult.Success) { - result = await physical.FlushAsync(false).ForAwait(); + result = await physical.FlushAsync(false, physical.OutputCancel).ForAwait(); } physical.SetIdle(); diff --git a/src/StackExchange.Redis/PhysicalConnection.Read.cs b/src/StackExchange.Redis/PhysicalConnection.Read.cs index 78125290a..14cc0a1a5 100644 --- a/src/StackExchange.Redis/PhysicalConnection.Read.cs +++ b/src/StackExchange.Redis/PhysicalConnection.Read.cs @@ -21,7 +21,22 @@ internal sealed partial class PhysicalConnection private volatile ReadStatus _readStatus = ReadStatus.NotStarted; internal ReadStatus GetReadStatus() => _readStatus; - internal void StartReading(CancellationToken cancellationToken = default) => ReadAllAsync(cancellationToken).RedisFireAndForget(); + internal void StartReading(CancellationToken cancellation = default) + { + if (cancellation.CanBeCanceled) + { + cancellation.ThrowIfCancellationRequested(); + if (InputCancel.CanBeCanceled) + { + cancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellation, InputCancel).Token; + } + } + else + { + cancellation = InputCancel; + } + ReadAllAsync(cancellation).RedisFireAndForget(); + } private async Task ReadAllAsync(CancellationToken cancellationToken) { diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index f5ab946b9..41d67207c 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -24,6 +24,26 @@ namespace StackExchange.Redis { internal sealed partial class PhysicalConnection : IDisposable { + // infrastructure to simulate connection death, debug only + private partial bool CanCancel(); + [Conditional("DEBUG")] + partial void OnCancel(bool input, bool output); +#if DEBUG + private readonly CancellationTokenSource _inputCancel = new(), _outputCancel = new(); + internal CancellationToken InputCancel => _inputCancel.Token; + internal CancellationToken OutputCancel => _outputCancel.Token; + + partial void OnCancel(bool input, bool output) + { + if (input) _inputCancel.Cancel(); + if (output) _outputCancel.Cancel(); + } + private partial bool CanCancel() => true; +#else + internal CancellationToken InputCancel => CancellationToken.None; + internal CancellationToken OutputCancel => CancellationToken.None; +#endif + internal readonly byte[]? ChannelPrefix; private const int DefaultRedisDatabaseCount = 16; @@ -208,7 +228,7 @@ static Socket CreateSocket(EndPoint endpoint) log?.LogInformationStartingRead(new(endpoint)); try { - StartReading(CancellationToken.None); + StartReading(); // Normal return } catch (Exception ex) @@ -353,7 +373,7 @@ public Task FlushAsync() if (tmp != null) { _writeStatus = WriteStatus.Flushing; - var flush = tmp.FlushAsync(); + var flush = tmp.FlushAsync(OutputCancel); if (!flush.IsCompletedSuccessfully) { return AwaitedFlush(flush); @@ -368,40 +388,23 @@ public Task FlushAsync() internal void SimulateConnectionFailure(SimulatedFailureType failureType) { - throw new NotImplementedException(nameof(SimulateConnectionFailure)); - /* - var raiseFailed = false; - if (connectionType == ConnectionType.Interactive) + bool killInput = false, killOutput = false; + switch (connectionType) { - if (failureType.HasFlag(SimulatedFailureType.InteractiveInbound)) - { - _ioPipe?.Input.Complete(new Exception("Simulating interactive input failure")); - raiseFailed = true; - } - if (failureType.HasFlag(SimulatedFailureType.InteractiveOutbound)) - { - _ioPipe?.Output.Complete(new Exception("Simulating interactive output failure")); - raiseFailed = true; - } + case ConnectionType.Interactive: + killInput = failureType.HasFlag(SimulatedFailureType.InteractiveInbound); + killOutput = failureType.HasFlag(SimulatedFailureType.InteractiveOutbound); + break; + case ConnectionType.Subscription: + killInput = failureType.HasFlag(SimulatedFailureType.SubscriptionInbound); + killOutput = failureType.HasFlag(SimulatedFailureType.SubscriptionOutbound); + break; } - else if (connectionType == ConnectionType.Subscription) - { - if (failureType.HasFlag(SimulatedFailureType.SubscriptionInbound)) - { - _ioPipe?.Input.Complete(new Exception("Simulating subscription input failure")); - raiseFailed = true; - } - if (failureType.HasFlag(SimulatedFailureType.SubscriptionOutbound)) - { - _ioPipe?.Output.Complete(new Exception("Simulating subscription output failure")); - raiseFailed = true; - } - } - if (raiseFailed) + if (killInput | killOutput) { + OnCancel(killInput, killOutput); RecordConnectionFailed(ConnectionFailureType.SocketFailure); } - */ } public void RecordConnectionFailed( @@ -886,7 +889,13 @@ private async ValueTask FlushAsync_Awaited(PhysicalConnection conne [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0062:Make local function 'static'", Justification = "DEBUG uses instance data")] internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout) { - var cts = _reusableFlushSyncTokenSource ??= new CancellationTokenSource(); + var cts = _reusableFlushSyncTokenSource; + if (cts is null) + { + cts = new CancellationTokenSource(); + OutputCancel.Register(static s => { ((CancellationTokenSource)s!).Cancel(); }, cts); + _reusableFlushSyncTokenSource = cts; + } var flush = FlushAsync(throwOnFailure, cts.Token); if (!flush.IsCompletedSuccessfully) { @@ -914,15 +923,14 @@ void ThrowTimeout() throw new TimeoutException("timeout while synchronously flushing"); } } - internal ValueTask FlushAsync(bool throwOnFailure, CancellationToken cancellationToken = default) + internal ValueTask FlushAsync(bool throwOnFailure, CancellationToken soleCancel) { var tmp = _ioStream; if (tmp == null) return new ValueTask(WriteResult.NoConnectionAvailable); try { _writeStatus = WriteStatus.Flushing; - tmp.FlushAsync(cancellationToken); - var flush = tmp.FlushAsync(cancellationToken); + var flush = tmp.FlushAsync(soleCancel); if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure); _writeStatus = WriteStatus.Flushed; UpdateLastWriteTime();