Skip to content
Merged

wip #3024

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
var flush = physical.FlushAsync(false);
var flush = physical.FlushAsync(false, physical.OutputCancel);
if (!flush.IsCompletedSuccessfully)
{
releaseLock = false; // so we don't release prematurely
Expand Down Expand Up @@ -1394,7 +1394,7 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(

if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
result = await physical.FlushAsync(false, physical.OutputCancel).ForAwait();
}

physical.SetIdle();
Expand Down
17 changes: 16 additions & 1 deletion src/StackExchange.Redis/PhysicalConnection.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,22 @@ internal sealed partial class PhysicalConnection
private volatile ReadStatus _readStatus = ReadStatus.NotStarted;
internal ReadStatus GetReadStatus() => _readStatus;

internal void StartReading(CancellationToken cancellationToken = default) => ReadAllAsync(cancellationToken).RedisFireAndForget();
internal void StartReading(CancellationToken cancellation = default)
{
if (cancellation.CanBeCanceled)
{
cancellation.ThrowIfCancellationRequested();
if (InputCancel.CanBeCanceled)
{
cancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellation, InputCancel).Token;
}
}
else
{
cancellation = InputCancel;
}
ReadAllAsync(cancellation).RedisFireAndForget();
}

private async Task ReadAllAsync(CancellationToken cancellationToken)
{
Expand Down
78 changes: 44 additions & 34 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@
{
internal sealed partial class PhysicalConnection : IDisposable
{
// infrastructure to simulate connection death, debug only
private partial bool CanCancel();

Check failure on line 28 in src/StackExchange.Redis/PhysicalConnection.cs

View workflow job for this annotation

GitHub Actions / StackExchange.Redis (Ubuntu)

Partial method 'PhysicalConnection.CanCancel()' must have an implementation part because it has accessibility modifiers.

Check failure on line 28 in src/StackExchange.Redis/PhysicalConnection.cs

View workflow job for this annotation

GitHub Actions / StackExchange.Redis (Ubuntu)

Partial method 'PhysicalConnection.CanCancel()' must have an implementation part because it has accessibility modifiers.

Check failure on line 28 in src/StackExchange.Redis/PhysicalConnection.cs

View workflow job for this annotation

GitHub Actions / StackExchange.Redis (Ubuntu)

Partial method 'PhysicalConnection.CanCancel()' must have an implementation part because it has accessibility modifiers.
[Conditional("DEBUG")]
partial void OnCancel(bool input, bool output);
#if DEBUG
private readonly CancellationTokenSource _inputCancel = new(), _outputCancel = new();
internal CancellationToken InputCancel => _inputCancel.Token;
internal CancellationToken OutputCancel => _outputCancel.Token;

partial void OnCancel(bool input, bool output)
{
if (input) _inputCancel.Cancel();
if (output) _outputCancel.Cancel();
}
private partial bool CanCancel() => true;
#else
internal CancellationToken InputCancel => CancellationToken.None;
internal CancellationToken OutputCancel => CancellationToken.None;
#endif

internal readonly byte[]? ChannelPrefix;

private const int DefaultRedisDatabaseCount = 16;
Expand Down Expand Up @@ -208,7 +228,7 @@
log?.LogInformationStartingRead(new(endpoint));
try
{
StartReading(CancellationToken.None);
StartReading();
// Normal return
}
catch (Exception ex)
Expand Down Expand Up @@ -353,7 +373,7 @@
if (tmp != null)
{
_writeStatus = WriteStatus.Flushing;
var flush = tmp.FlushAsync();
var flush = tmp.FlushAsync(OutputCancel);
if (!flush.IsCompletedSuccessfully)
{
return AwaitedFlush(flush);
Expand All @@ -368,40 +388,23 @@

internal void SimulateConnectionFailure(SimulatedFailureType failureType)
{
throw new NotImplementedException(nameof(SimulateConnectionFailure));
/*
var raiseFailed = false;
if (connectionType == ConnectionType.Interactive)
{
if (failureType.HasFlag(SimulatedFailureType.InteractiveInbound))
{
_ioPipe?.Input.Complete(new Exception("Simulating interactive input failure"));
raiseFailed = true;
}
if (failureType.HasFlag(SimulatedFailureType.InteractiveOutbound))
{
_ioPipe?.Output.Complete(new Exception("Simulating interactive output failure"));
raiseFailed = true;
}
}
else if (connectionType == ConnectionType.Subscription)
bool killInput = false, killOutput = false;
switch (connectionType)
{
if (failureType.HasFlag(SimulatedFailureType.SubscriptionInbound))
{
_ioPipe?.Input.Complete(new Exception("Simulating subscription input failure"));
raiseFailed = true;
}
if (failureType.HasFlag(SimulatedFailureType.SubscriptionOutbound))
{
_ioPipe?.Output.Complete(new Exception("Simulating subscription output failure"));
raiseFailed = true;
}
case ConnectionType.Interactive:
killInput = failureType.HasFlag(SimulatedFailureType.InteractiveInbound);
killOutput = failureType.HasFlag(SimulatedFailureType.InteractiveOutbound);
break;
case ConnectionType.Subscription:
killInput = failureType.HasFlag(SimulatedFailureType.SubscriptionInbound);
killOutput = failureType.HasFlag(SimulatedFailureType.SubscriptionOutbound);
break;
}
if (raiseFailed)
if (killInput | killOutput)
{
OnCancel(killInput, killOutput);
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
}
*/
}

public void RecordConnectionFailed(
Expand Down Expand Up @@ -886,7 +889,13 @@
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0062:Make local function 'static'", Justification = "DEBUG uses instance data")]
internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
{
var cts = _reusableFlushSyncTokenSource ??= new CancellationTokenSource();
var cts = _reusableFlushSyncTokenSource;
if (cts is null)
{
cts = new CancellationTokenSource();
OutputCancel.Register(static s => { ((CancellationTokenSource)s!).Cancel(); }, cts);
_reusableFlushSyncTokenSource = cts;
}
var flush = FlushAsync(throwOnFailure, cts.Token);
if (!flush.IsCompletedSuccessfully)
{
Expand Down Expand Up @@ -914,14 +923,15 @@
throw new TimeoutException("timeout while synchronously flushing");
}
}
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure, CancellationToken cancellationToken = default)
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure, CancellationToken soleCancel)
{
var tmp = _ioStream;
if (tmp == null) return new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);
try
{
_writeStatus = WriteStatus.Flushing;
var flush = tmp.FlushAsync(cancellationToken);
var flush = tmp.FlushAsync(soleCancel);

if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure);
_writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime();
Expand Down
Loading