From 5aebe65a86fd8f1176d820d53d9a03a2cc170005 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Thu, 9 Oct 2025 12:45:44 +0100 Subject: [PATCH 1/7] add failing test and mitigation for OSS sharded sunbscribe behavior --- src/StackExchange.Redis/PhysicalConnection.cs | 188 ++++++++++++------ src/StackExchange.Redis/RedisSubscriber.cs | 26 ++- .../ServerSelectionStrategy.cs | 2 +- .../ClusterShardedTests.cs | 152 +++++++++++++- 4 files changed, 298 insertions(+), 70 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index c21bc07fc..ccc67b4d2 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -29,7 +29,8 @@ internal sealed partial class PhysicalConnection : IDisposable private const int DefaultRedisDatabaseCount = 16; - private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage"; + private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage", + subscribe = "subscribe", sunsubscribe = "sunsubscribe"; private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray(); @@ -1669,6 +1670,36 @@ internal async ValueTask ConnectedAsync(Socket? socket, ILogger? log, Sock } } + private enum PushKind + { + None, + Message, + SMessage, + PMessage, + Subscribe, + SUnsubscribe, + } + private static PushKind GetPushKind(in Sequence result) + { + var len = result.Length; + if (len >= 1) + { + ref readonly RawResult kind = ref result[0]; + if (len >= 3) + { + if (kind.IsEqual(message)) return PushKind.Message; + if (kind.IsEqual(smessage)) return PushKind.SMessage; + if (len >= 4) + { + if (kind.IsEqual(pmessage)) return PushKind.PMessage; + } + if (kind.IsEqual(sunsubscribe)) return PushKind.SUnsubscribe; + } + if (kind.IsEqual(subscribe)) return PushKind.Subscribe; + } + return PushKind.None; + } + private void MatchResult(in RawResult result) { // check to see if it could be an out-of-band pubsub message @@ -1679,85 +1710,121 @@ private void MatchResult(in RawResult result) // out of band message does not match to a queued message var items = result.GetItems(); - if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage))) + var kind = GetPushKind(items); + switch (kind) { - _readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage; + case PushKind.Message: + case PushKind.SMessage: + _readStatus = kind is PushKind.Message ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage; - // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) - var configChanged = muxer.ConfigurationChangedChannel; - if (configChanged != null && items[1].IsEqual(configChanged)) - { - EndPoint? blame = null; - try + // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) + var configChanged = muxer.ConfigurationChangedChannel; + if (configChanged != null && items[1].IsEqual(configChanged)) { - if (!items[2].IsEqual(CommonReplies.wildcard)) + EndPoint? blame = null; + try + { + if (!items[2].IsEqual(CommonReplies.wildcard)) + { + // We don't want to fail here, just trying to identify + _ = Format.TryParseEndPoint(items[2].GetString(), out blame); + } + } + catch { - // We don't want to fail here, just trying to identify - _ = Format.TryParseEndPoint(items[2].GetString(), out blame); + /* no biggie */ } + + Trace("Configuration changed: " + Format.ToString(blame)); + _readStatus = ReadStatus.Reconfigure; + muxer.ReconfigureIfNeeded(blame, true, "broadcast"); } - catch { /* no biggie */ } - Trace("Configuration changed: " + Format.ToString(blame)); - _readStatus = ReadStatus.Reconfigure; - muxer.ReconfigureIfNeeded(blame, true, "broadcast"); - } - // invoke the handlers - RedisChannel channel; - if (items[0].IsEqual(message)) - { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); - Trace("MESSAGE: " + channel); - } - else // see check on outer-if that restricts to message / smessage - { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); - Trace("SMESSAGE: " + channel); - } - if (!channel.IsNull) - { - if (TryGetPubSubPayload(items[2], out var payload)) + // invoke the handlers + RedisChannel channel; + if (items[0].IsEqual(message)) { - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(channel, channel, payload); + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); + Trace("MESSAGE: " + channel); } - // could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507 - else if (TryGetMultiPubSubPayload(items[2], out var payloads)) + else // see check on outer-if that restricts to message / smessage { - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(channel, channel, payloads); + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); + Trace("SMESSAGE: " + channel); + } + + if (!channel.IsNull) + { + if (TryGetPubSubPayload(items[2], out var payload)) + { + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(channel, channel, payload); + } + // could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507 + else if (TryGetMultiPubSubPayload(items[2], out var payloads)) + { + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(channel, channel, payloads); + } } - } - return; // AND STOP PROCESSING! - } - else if (items.Length >= 4 && items[0].IsEqual(pmessage)) - { - _readStatus = ReadStatus.PubSubPMessage; - var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + return; // AND STOP PROCESSING! + case PushKind.PMessage: + _readStatus = ReadStatus.PubSubPMessage; - Trace("PMESSAGE: " + channel); - if (!channel.IsNull) - { - if (TryGetPubSubPayload(items[3], out var payload)) + channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + + Trace("PMESSAGE: " + channel); + if (!channel.IsNull) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + if (TryGetPubSubPayload(items[3], out var payload)) + { + var sub = items[1].AsRedisChannel( + ChannelPrefix, + RedisChannel.RedisChannelOptions.Pattern); - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(sub, channel, payload); + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(sub, channel, payload); + } + else if (TryGetMultiPubSubPayload(items[3], out var payloads)) + { + var sub = items[1].AsRedisChannel( + ChannelPrefix, + RedisChannel.RedisChannelOptions.Pattern); + + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(sub, channel, payloads); + } } - else if (TryGetMultiPubSubPayload(items[3], out var payloads)) - { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(sub, channel, payloads); + break; + case PushKind.SUnsubscribe: + _readStatus = ReadStatus.PubSubSUnsubscribe; + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); + var server = BridgeCouldBeNull?.ServerEndPoint; + if (server is not null && muxer.TryGetSubscription(channel, out var subscription)) + { + if (subscription.GetCurrentServer() == server) + { + // definitely isn't this connection any more, but we were listening + subscription.SetCurrentServer(null); + muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, nameof(sunsubscribe)); + } } - } - return; // AND STOP PROCESSING! + break; } - // if it didn't look like "[p|s]message", then we still need to process the pending queue + switch (kind) + { + // we recognized it a RESP2 OOB, or it was explicitly *any* RESP3 push notification + // (even if we didn't recognize the kind) - we're done; unless it is "subscribe", which + // is *technically* a push, but we still want to treat it as a response to the original message + case PushKind.None when result.Resp3Type != ResultType.Push: + case PushKind.Subscribe: + break; // continue, try to match to a pending message + default: + return; // we're done with this message (RESP3 OOB, or something we recognized) + } } Trace("Matching result..."); @@ -2168,6 +2235,7 @@ internal enum ReadStatus MatchResultComplete, ResetArena, ProcessBufferComplete, + PubSubSUnsubscribe, NA = -1, } private volatile ReadStatus _readStatus; diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 8ff9610b0..4b04a424b 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -145,6 +145,14 @@ internal long EnsureSubscriptions(CommandFlags flags = CommandFlags.None) return count; } + internal void EnsureSubscription(Subscription sub, in RedisChannel channel, CommandFlags flags) + { + if (!sub.IsConnected) + { + DefaultSubscriber.EnsureSubscribedToServer(sub, channel, flags, true); + } + } + internal enum SubscriptionAction { Subscribe, @@ -404,7 +412,7 @@ public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = return queue; } - public bool Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) + private bool Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) { ThrowIfNull(channel); if (handler == null && queue == null) { return true; } @@ -428,32 +436,34 @@ internal bool EnsureSubscribedToServer(Subscription sub, RedisChannel channel, C Task ISubscriber.SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags) => SubscribeAsync(channel, handler, null, flags); - public async Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) + Task ISubscriber.SubscribeAsync(RedisChannel channel, CommandFlags flags) => SubscribeAsync(channel, flags); + + public async Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None, ServerEndPoint? server = null) { var queue = new ChannelMessageQueue(channel, this); - await SubscribeAsync(channel, null, queue, flags).ForAwait(); + await SubscribeAsync(channel, null, queue, flags, server).ForAwait(); return queue; } - public Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) + private Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags, ServerEndPoint? server = null) { ThrowIfNull(channel); if (handler == null && queue == null) { return CompletedTask.Default(null); } var sub = multiplexer.GetOrAddSubscription(channel, flags); sub.Add(handler, queue); - return EnsureSubscribedToServerAsync(sub, channel, flags, false); + return EnsureSubscribedToServerAsync(sub, channel, flags, false, server); } - public Task EnsureSubscribedToServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall) + public Task EnsureSubscribedToServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall, ServerEndPoint? server = null) { if (sub.IsConnected) { return CompletedTask.Default(null); } // TODO: Cleanup old hangers here? sub.SetCurrentServer(null); // we're not appropriately connected, so blank it out for eligible reconnection var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); - var selected = multiplexer.SelectServer(message); - return ExecuteAsync(message, sub.Processor, selected); + server ??= multiplexer.SelectServer(message); + return ExecuteAsync(message, sub.Processor, server); } public EndPoint? SubscribedEndpoint(RedisChannel channel) => multiplexer.GetSubscribedServer(channel)?.EndPoint; diff --git a/src/StackExchange.Redis/ServerSelectionStrategy.cs b/src/StackExchange.Redis/ServerSelectionStrategy.cs index 4084b4c33..ca247c38b 100644 --- a/src/StackExchange.Redis/ServerSelectionStrategy.cs +++ b/src/StackExchange.Redis/ServerSelectionStrategy.cs @@ -328,7 +328,7 @@ private ServerEndPoint[] MapForMutation() return arr; } - private ServerEndPoint? Select(int slot, RedisCommand command, CommandFlags flags, bool allowDisconnected) + internal ServerEndPoint? Select(int slot, RedisCommand command, CommandFlags flags, bool allowDisconnected) { // Only interested in primary/replica preferences flags = Message.GetPrimaryReplicaFlags(flags); diff --git a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs index dd57483b9..e713d345c 100644 --- a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs @@ -1,4 +1,7 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -174,4 +177,151 @@ private async Task MigrateSlotForTestShardChannelAsync(bool rollback) Log("Slot already migrated."); } } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task SubscribeToWrongServerAsync(bool sharded) + { + // the purpose of this test is to simulate subscribing while a node move is happening, i.e. we send + // the SSUBSCRIBE to the wrong server, get a -MOVED, and redirect; in particular: do we end up *knowing* + // where we actually subscribed to? + // + // note: to check our thinking, we also do this for regular non-sharded channels too; the point here + // being that this should behave *differently*, since there will be no -MOVED + var name = $"{Me()}:{Guid.NewGuid()}"; + var channel = sharded ? RedisChannel.Sharded(name) : RedisChannel.Literal(name).WithKeyRouting(); + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var asKey = (RedisKey)(byte[])channel!; + Assert.False(asKey.IsEmpty); + var shouldBeServer = conn.GetServer(asKey); // this is where it *should* go + + // now intentionally choose *a different* server + var server = conn.GetServers().First(s => !Equals(s.EndPoint, shouldBeServer.EndPoint)); + Log($"Should be {Format.ToString(shouldBeServer.EndPoint)}; routing via {Format.ToString(server.EndPoint)}"); + + var subscriber = Assert.IsType(conn.GetSubscriber()); + var serverEndpoint = conn.GetServerEndPoint(server.EndPoint); + Assert.Equal(server.EndPoint, serverEndpoint.EndPoint); + var queue = await subscriber.SubscribeAsync(channel, server: serverEndpoint); + await Task.Delay(50); + var actual = subscriber.SubscribedEndpoint(channel); + + if (sharded) + { + // we should end up at the correct node, following the -MOVED + Assert.Equal(shouldBeServer.EndPoint, actual); + } + else + { + // we should end up where we *actually sent the message* - there is no -MOVED + Assert.Equal(serverEndpoint.EndPoint, actual); + } + await queue.UnsubscribeAsync(); + } + + [Fact] + public async Task KeepSubscribedThroughSlotMigrationAsync() + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1, allowAdmin: true); + var name = $"{Me()}:{Guid.NewGuid()}"; + var channel = RedisChannel.Sharded(name); + var subscriber = conn.GetSubscriber(); + var queue = await subscriber.SubscribeAsync(channel); + await Task.Delay(50); + var actual = subscriber.SubscribedEndpoint(channel); + Assert.NotNull(actual); + + var asKey = (RedisKey)(byte[])channel!; + Assert.False(asKey.IsEmpty); + var slot = conn.GetHashSlot(asKey); + var viaMap = conn.ServerSelectionStrategy.Select(slot, RedisCommand.SSUBSCRIBE, CommandFlags.None, allowDisconnected: false); + + Log($"Slot {slot}, subscribed to {Format.ToString(actual)} (mapped to {Format.ToString(viaMap?.EndPoint)})"); + Assert.NotNull(viaMap); + Assert.Equal(actual, viaMap.EndPoint); + + var oldServer = conn.GetServer(asKey); // this is where it *should* go + + // now intentionally choose *a different* server + var newServer = conn.GetServers().First(s => !Equals(s.EndPoint, oldServer.EndPoint)); + + var nodes = await newServer.ClusterNodesAsync(); + Assert.NotNull(nodes); + var fromNode = nodes[oldServer.EndPoint]?.NodeId; + var toNode = nodes[newServer.EndPoint]?.NodeId; + Assert.NotNull(fromNode); + Assert.NotNull(toNode); + Assert.Equal(oldServer.EndPoint, nodes.GetBySlot(slot)?.EndPoint); + + var ep = subscriber.SubscribedEndpoint(channel); + Log($"Endpoint before migration: {Format.ToString(ep)}"); + Log($"Migrating slot {slot} to {Format.ToString(newServer.EndPoint)}; node {fromNode} -> {toNode}..."); + + // see https://redis.io/docs/latest/commands/cluster-setslot/#redis-cluster-live-resharding-explained + WriteLog("IMPORTING", await newServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "IMPORTING", fromNode)); + WriteLog("MIGRATING", await oldServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "MIGRATING", toNode)); + + while (true) + { + var keys = (await oldServer.ExecuteAsync("CLUSTER", "GETKEYSINSLOT", slot, 100)).AsRedisKeyArray()!; + Log($"Migrating {keys.Length} keys..."); + if (keys.Length == 0) break; + foreach (var key in keys) + { + await conn.GetDatabase().KeyMigrateAsync(key, newServer.EndPoint, migrateOptions: MigrateOptions.None); + } + } + + WriteLog("NODE (old)", await newServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "NODE", toNode)); + WriteLog("NODE (new)", await oldServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "NODE", toNode)); + + void WriteLog(string caption, RedisResult result) + { + if (result.IsNull) + { + Log($"{caption}: null"); + } + else if (result.Length >= 0) + { + var arr = result.AsRedisValueArray()!; + Log($"{caption}: {arr.Length} items"); + foreach (var item in arr) + { + Log($" {item}"); + } + } + else + { + Log($"{caption}: {result}"); + } + } + + Log("Migration initiated; checking node state..."); + await Task.Delay(100); + ep = subscriber.SubscribedEndpoint(channel); + Log($"Endpoint after migration: {Format.ToString(ep)}"); + Assert.True( + ep is null || ep == newServer.EndPoint, + "Target server after migration should be null or the new server"); + + nodes = await newServer.ClusterNodesAsync(); + Assert.NotNull(nodes); + Assert.Equal(newServer.EndPoint, nodes.GetBySlot(slot)?.EndPoint); + await conn.ConfigureAsync(); + Assert.Equal(newServer, conn.GetServer(asKey)); + + // now publish... we *expect* things to have sorted themselves out + var msg = Guid.NewGuid().ToString(); + var count = await subscriber.PublishAsync(channel, msg); + Assert.Equal(1, count); + + Log("Waiting for message..."); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(20)); + var received = await queue.ReadAsync(timeout.Token); + Assert.Equal(msg, (string)received.Message!); + + await queue.UnsubscribeAsync(); + } } From 029db3dc050980f0f0d2c0457e421d9e7fdc62a1 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Thu, 9 Oct 2025 17:01:48 +0100 Subject: [PATCH 2/7] fix unsolicited SUNSUBSCRIBE --- src/StackExchange.Redis/Message.cs | 2 +- src/StackExchange.Redis/PhysicalConnection.cs | 212 ++++++++++++------ .../ClusterShardedTests.cs | 39 +++- 3 files changed, 170 insertions(+), 83 deletions(-) diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index a3c19ab93..0c9eb4c92 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -856,7 +856,7 @@ protected override void WriteImpl(PhysicalConnection physical) internal abstract class CommandChannelBase : Message { - protected readonly RedisChannel Channel; + internal readonly RedisChannel Channel; protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, in RedisChannel channel) : base(db, flags, command) { diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index ccc67b4d2..89b023336 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -29,9 +29,6 @@ internal sealed partial class PhysicalConnection : IDisposable private const int DefaultRedisDatabaseCount = 16; - private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage", - subscribe = "subscribe", sunsubscribe = "sunsubscribe"; - private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray(); @@ -1674,32 +1671,122 @@ private enum PushKind { None, Message, - SMessage, PMessage, - Subscribe, - SUnsubscribe, + SMessage, + Subscribe = RedisCommand.SUBSCRIBE, + PSubscribe = RedisCommand.PSUBSCRIBE, + SSubscribe = RedisCommand.SSUBSCRIBE, + Unsubscribe = RedisCommand.UNSUBSCRIBE, + PUnsubscribe = RedisCommand.PUNSUBSCRIBE, + SUnsubscribe = RedisCommand.SUNSUBSCRIBE, } - private static PushKind GetPushKind(in Sequence result) + private PushKind GetPushKind(in Sequence result, out RedisChannel channel) { var len = result.Length; - if (len >= 1) - { - ref readonly RawResult kind = ref result[0]; - if (len >= 3) - { - if (kind.IsEqual(message)) return PushKind.Message; - if (kind.IsEqual(smessage)) return PushKind.SMessage; - if (len >= 4) + if (len >= 2) // always have at least the kind and the subscription channel + { + const int MAX_LEN = 16; + Debug.Assert(MAX_LEN >= Enumerable.Max( + [ + PushMessage.Length, PushPMessage.Length, PushSMessage.Length, + PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length, + PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length, + ])); + ref readonly RawResult pushKind = ref result[0]; + var multiSegmentPayload = pushKind.Payload; + if (multiSegmentPayload.Length <= MAX_LEN) + { + var span = multiSegmentPayload.IsSingleSegment + ? multiSegmentPayload.First.Span + : CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload); + + var hash = FastHash.Hash64(span); + RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None; + PushKind kind; + switch (hash) + { + case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3: + kind = PushKind.Message; + break; + case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4: + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PMessage; + break; + case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3: + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SMessage; + break; + case PushSubscribe.Hash when PushSubscribe.Is(hash, span): + kind = PushKind.Subscribe; + break; + case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PSubscribe; + break; + case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SSubscribe; + break; + case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span): + kind = PushKind.Unsubscribe; + break; + case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PUnsubscribe; + break; + case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SUnsubscribe; + break; + default: + kind = PushKind.None; + break; + } + if (kind != PushKind.None) { - if (kind.IsEqual(pmessage)) return PushKind.PMessage; + // the channel is always the second element + channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions); + return kind; } - if (kind.IsEqual(sunsubscribe)) return PushKind.SUnsubscribe; } - if (kind.IsEqual(subscribe)) return PushKind.Subscribe; } + channel = default; return PushKind.None; + + static ReadOnlySpan CopyTo(Span target, in ReadOnlySequence source) + { + source.CopyTo(target); + return target.Slice(0, (int)source.Length); + } } + [FastHash("message")] + private static partial class PushMessage { } + + [FastHash("pmessage")] + private static partial class PushPMessage { } + + [FastHash("smessage")] + private static partial class PushSMessage { } + + [FastHash("subscribe")] + private static partial class PushSubscribe { } + + [FastHash("psubscribe")] + private static partial class PushPSubscribe { } + + [FastHash("ssubscribe")] + private static partial class PushSSubscribe { } + + [FastHash("unsubscribe")] + private static partial class PushUnsubscribe { } + + [FastHash("punsubscribe")] + private static partial class PushPUnsubscribe { } + + [FastHash("sunsubscribe")] + private static partial class PushSUnsubscribe { } + private void MatchResult(in RawResult result) { // check to see if it could be an out-of-band pubsub message @@ -1710,7 +1797,7 @@ private void MatchResult(in RawResult result) // out of band message does not match to a queued message var items = result.GetItems(); - var kind = GetPushKind(items); + var kind = GetPushKind(items, out var subscriptionChannel); switch (kind) { case PushKind.Message: @@ -1741,89 +1828,55 @@ private void MatchResult(in RawResult result) } // invoke the handlers - RedisChannel channel; - if (items[0].IsEqual(message)) - { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); - Trace("MESSAGE: " + channel); - } - else // see check on outer-if that restricts to message / smessage - { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); - Trace("SMESSAGE: " + channel); - } - - if (!channel.IsNull) + if (!subscriptionChannel.IsNull) { + Trace($"{kind}: {subscriptionChannel}"); if (TryGetPubSubPayload(items[2], out var payload)) { _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(channel, channel, payload); + muxer.OnMessage(subscriptionChannel, subscriptionChannel, payload); } // could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507 else if (TryGetMultiPubSubPayload(items[2], out var payloads)) { _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(channel, channel, payloads); + muxer.OnMessage(subscriptionChannel, subscriptionChannel, payloads); } } - - return; // AND STOP PROCESSING! + return; // and stop processing case PushKind.PMessage: _readStatus = ReadStatus.PubSubPMessage; - channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); - - Trace("PMESSAGE: " + channel); - if (!channel.IsNull) + var messageChannel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); + if (!messageChannel.IsNull) { + Trace($"{kind}: {messageChannel} via {subscriptionChannel}"); if (TryGetPubSubPayload(items[3], out var payload)) { - var sub = items[1].AsRedisChannel( - ChannelPrefix, - RedisChannel.RedisChannelOptions.Pattern); - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(sub, channel, payload); + muxer.OnMessage(subscriptionChannel, messageChannel, payload); } else if (TryGetMultiPubSubPayload(items[3], out var payloads)) { - var sub = items[1].AsRedisChannel( - ChannelPrefix, - RedisChannel.RedisChannelOptions.Pattern); - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(sub, channel, payloads); + muxer.OnMessage(subscriptionChannel, messageChannel, payloads); } } - - break; - case PushKind.SUnsubscribe: - _readStatus = ReadStatus.PubSubSUnsubscribe; - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); + return; // and stop processing + case PushKind.SUnsubscribe when !PeekChannelMessage(RedisCommand.SUNSUBSCRIBE, subscriptionChannel): + // then it was *unsolicited* - this probably means the slot was migrated + // (otherwise, we'll let the command-processor deal with it) + _readStatus = ReadStatus.PubSubUnsubscribe; var server = BridgeCouldBeNull?.ServerEndPoint; - if (server is not null && muxer.TryGetSubscription(channel, out var subscription)) + if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription)) { if (subscription.GetCurrentServer() == server) { - // definitely isn't this connection any more, but we were listening - subscription.SetCurrentServer(null); - muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, nameof(sunsubscribe)); + subscription.SetCurrentServer(null); // wipe + muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, PushSUnsubscribe.Text); } } - break; - } - - switch (kind) - { - // we recognized it a RESP2 OOB, or it was explicitly *any* RESP3 push notification - // (even if we didn't recognize the kind) - we're done; unless it is "subscribe", which - // is *technically* a push, but we still want to treat it as a response to the original message - case PushKind.None when result.Resp3Type != ResultType.Push: - case PushKind.Subscribe: - break; // continue, try to match to a pending message - default: - return; // we're done with this message (RESP3 OOB, or something we recognized) + return; // and STOP PROCESSING; unsolicited } } Trace("Matching result..."); @@ -1942,6 +1995,19 @@ static bool TryGetMultiPubSubPayload(in RawResult value, out Sequence } } + private bool PeekChannelMessage(RedisCommand command, RedisChannel channel) + { + Message? msg; + bool haveMsg; + lock (_writtenAwaitingResponse) + { + haveMsg = _writtenAwaitingResponse.TryPeek(out msg); + } + + return haveMsg && msg is CommandChannelBase typed + && typed.Command == command && typed.Channel == channel; + } + private volatile Message? _activeMessage; internal void GetHeadMessages(out Message? now, out Message? next) @@ -2235,7 +2301,7 @@ internal enum ReadStatus MatchResultComplete, ResetArena, ProcessBufferComplete, - PubSubSUnsubscribe, + PubSubUnsubscribe, NA = -1, } private volatile ReadStatus _readStatus; diff --git a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs index e713d345c..0c57de747 100644 --- a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs @@ -218,7 +218,10 @@ public async Task SubscribeToWrongServerAsync(bool sharded) // we should end up where we *actually sent the message* - there is no -MOVED Assert.Equal(serverEndpoint.EndPoint, actual); } + + Log("Unsubscribing..."); await queue.UnsubscribeAsync(); + Log("Unsubscribed."); } [Fact] @@ -244,6 +247,19 @@ public async Task KeepSubscribedThroughSlotMigrationAsync() var oldServer = conn.GetServer(asKey); // this is where it *should* go + using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5))) + { + // now publish... we *expect* things to have sorted themselves out + var msg = Guid.NewGuid().ToString(); + var count = await subscriber.PublishAsync(channel, msg); + Assert.Equal(1, count); + + Log("Waiting for message on original subscription..."); + var received = await queue.ReadAsync(timeout.Token); + Log($"Message received: {received.Message}"); + Assert.Equal(msg, (string)received.Message!); + } + // now intentionally choose *a different* server var newServer = conn.GetServers().First(s => !Equals(s.EndPoint, oldServer.EndPoint)); @@ -312,16 +328,21 @@ void WriteLog(string caption, RedisResult result) await conn.ConfigureAsync(); Assert.Equal(newServer, conn.GetServer(asKey)); - // now publish... we *expect* things to have sorted themselves out - var msg = Guid.NewGuid().ToString(); - var count = await subscriber.PublishAsync(channel, msg); - Assert.Equal(1, count); - - Log("Waiting for message..."); - using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(20)); - var received = await queue.ReadAsync(timeout.Token); - Assert.Equal(msg, (string)received.Message!); + using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5))) + { + // now publish... we *expect* things to have sorted themselves out + var msg = Guid.NewGuid().ToString(); + var count = await subscriber.PublishAsync(channel, msg); + Assert.Equal(1, count); + + Log("Waiting for message on moved subscription..."); + var received = await queue.ReadAsync(timeout.Token); + Log($"Message received: {received.Message}"); + Assert.Equal(msg, (string)received.Message!); + } + Log("Unsubscribing..."); await queue.UnsubscribeAsync(); + Log("Unsubscribed."); } } From de1da5b6262ecaf35d462c1994e9e3dbcc483978 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Thu, 9 Oct 2025 17:08:02 +0100 Subject: [PATCH 3/7] remove redundant code --- src/StackExchange.Redis/RedisSubscriber.cs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 4b04a424b..4b6a8a081 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -145,14 +145,6 @@ internal long EnsureSubscriptions(CommandFlags flags = CommandFlags.None) return count; } - internal void EnsureSubscription(Subscription sub, in RedisChannel channel, CommandFlags flags) - { - if (!sub.IsConnected) - { - DefaultSubscriber.EnsureSubscribedToServer(sub, channel, flags, true); - } - } - internal enum SubscriptionAction { Subscribe, From 97ccba26efcedd61125135be3d62109a38ba1bf6 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Thu, 9 Oct 2025 19:47:10 +0100 Subject: [PATCH 4/7] ssh test From 41685a58a673f2a6e5693b6064724efaabe68adb Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Fri, 10 Oct 2025 16:22:51 +0100 Subject: [PATCH 5/7] SUNSUBSCRIBE handling; if possible, use the active connection to find where we should be subscribing --- src/StackExchange.Redis/PhysicalConnection.cs | 10 +++--- src/StackExchange.Redis/RedisSubscriber.cs | 35 ++++++++++++++++++- .../ClusterShardedTests.cs | 2 ++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 89b023336..d990835a5 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1870,11 +1870,11 @@ private void MatchResult(in RawResult result) var server = BridgeCouldBeNull?.ServerEndPoint; if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription)) { - if (subscription.GetCurrentServer() == server) - { - subscription.SetCurrentServer(null); // wipe - muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, PushSUnsubscribe.Text); - } + // wipe and reconnect; but: to where? + // counter-intuitively, the only server we *know* already knows the new route is: + // the outgoing server, since it had to change to MIGRATING etc; the new INCOMING server + // knows, but *we don't know who that is*, and other nodes: aren't guaranteed to know (yet) + muxer.DefaultSubscriber.ResubscribeToServer(subscription, subscriptionChannel, server, cause: PushSUnsubscribe.Text); } return; // and STOP PROCESSING; unsolicited } diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 4b6a8a081..9ade78c2d 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.SymbolStore; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -13,7 +14,7 @@ namespace StackExchange.Redis public partial class ConnectionMultiplexer { private RedisSubscriber? _defaultSubscriber; - private RedisSubscriber DefaultSubscriber => _defaultSubscriber ??= new RedisSubscriber(this, null); + internal RedisSubscriber DefaultSubscriber => _defaultSubscriber ??= new RedisSubscriber(this, null); private readonly ConcurrentDictionary subscriptions = new(); @@ -282,6 +283,17 @@ internal void GetSubscriberCounts(out int handlers, out int queues) internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref CurrentServer); internal void SetCurrentServer(ServerEndPoint? server) => CurrentServer = server; + // conditional clear + internal bool ClearCurrentServer(ServerEndPoint expected) + { + if (CurrentServer == expected) + { + CurrentServer = null; + return true; + } + + return false; + } /// /// Evaluates state and if we're not currently connected, clears the server reference. @@ -425,6 +437,27 @@ internal bool EnsureSubscribedToServer(Subscription sub, RedisChannel channel, C return ExecuteSync(message, sub.Processor, selected); } + internal void ResubscribeToServer(Subscription sub, RedisChannel channel, ServerEndPoint serverEndPoint, string cause) + { + // conditional: only if that's the server we were connected to, or "none"; we don't want to end up duplicated + if (sub.ClearCurrentServer(serverEndPoint) || !sub.IsConnected) + { + if (serverEndPoint.IsSubscriberConnected) + { + // we'll *try* for a simple resubscribe, following any -MOVED etc, but if that fails: fall back + // to full reconfigure; importantly, note that we've already recorded the disconnect + var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false); + _ = ExecuteAsync(message, sub.Processor, serverEndPoint).ContinueWith( + t => multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause), + TaskContinuationOptions.OnlyOnFaulted); + } + else + { + multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause); + } + } + } + Task ISubscriber.SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags) => SubscribeAsync(channel, handler, null, flags); diff --git a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs index 0c57de747..8af0a1c7b 100644 --- a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs @@ -339,6 +339,8 @@ void WriteLog(string caption, RedisResult result) var received = await queue.ReadAsync(timeout.Token); Log($"Message received: {received.Message}"); Assert.Equal(msg, (string)received.Message!); + ep = subscriber.SubscribedEndpoint(channel); + Log($"Endpoint after receiving message: {Format.ToString(ep)}"); } Log("Unsubscribing..."); From 04a2939e0100c9509ca56c104d4665bfa0ad29f9 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 13 Oct 2025 10:40:25 +0100 Subject: [PATCH 6/7] PR nits --- docs/ReleaseNotes.md | 2 ++ src/StackExchange.Redis/PhysicalConnection.cs | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index b3546e969..615f44497 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,6 +8,8 @@ Current package versions: ## Unreleased +- Fix `SSUBSCRIBE` routing during slot migrations ([#2969 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2969)) + ## 2.9.25 - (build) Fix SNK on non-Windows builds ([#2963 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2963)) diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index d990835a5..817314406 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1673,12 +1673,12 @@ private enum PushKind Message, PMessage, SMessage, - Subscribe = RedisCommand.SUBSCRIBE, - PSubscribe = RedisCommand.PSUBSCRIBE, - SSubscribe = RedisCommand.SSUBSCRIBE, - Unsubscribe = RedisCommand.UNSUBSCRIBE, - PUnsubscribe = RedisCommand.PUNSUBSCRIBE, - SUnsubscribe = RedisCommand.SUNSUBSCRIBE, + Subscribe, + PSubscribe, + SSubscribe, + Unsubscribe, + PUnsubscribe, + SUnsubscribe, } private PushKind GetPushKind(in Sequence result, out RedisChannel channel) { From 11803a00c3893e5d3223a1b100843a4186b2a044 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 13 Oct 2025 10:43:03 +0100 Subject: [PATCH 7/7] more PR nits --- src/StackExchange.Redis/PhysicalConnection.cs | 134 +++++++++--------- 1 file changed, 69 insertions(+), 65 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 817314406..57bcd608d 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1683,71 +1683,75 @@ private enum PushKind private PushKind GetPushKind(in Sequence result, out RedisChannel channel) { var len = result.Length; - if (len >= 2) // always have at least the kind and the subscription channel - { - const int MAX_LEN = 16; - Debug.Assert(MAX_LEN >= Enumerable.Max( - [ - PushMessage.Length, PushPMessage.Length, PushSMessage.Length, - PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length, - PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length, - ])); - ref readonly RawResult pushKind = ref result[0]; - var multiSegmentPayload = pushKind.Payload; - if (multiSegmentPayload.Length <= MAX_LEN) - { - var span = multiSegmentPayload.IsSingleSegment - ? multiSegmentPayload.First.Span - : CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload); - - var hash = FastHash.Hash64(span); - RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None; - PushKind kind; - switch (hash) - { - case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3: - kind = PushKind.Message; - break; - case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4: - channelOptions = RedisChannel.RedisChannelOptions.Pattern; - kind = PushKind.PMessage; - break; - case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3: - channelOptions = RedisChannel.RedisChannelOptions.Sharded; - kind = PushKind.SMessage; - break; - case PushSubscribe.Hash when PushSubscribe.Is(hash, span): - kind = PushKind.Subscribe; - break; - case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span): - channelOptions = RedisChannel.RedisChannelOptions.Pattern; - kind = PushKind.PSubscribe; - break; - case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span): - channelOptions = RedisChannel.RedisChannelOptions.Sharded; - kind = PushKind.SSubscribe; - break; - case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span): - kind = PushKind.Unsubscribe; - break; - case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span): - channelOptions = RedisChannel.RedisChannelOptions.Pattern; - kind = PushKind.PUnsubscribe; - break; - case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span): - channelOptions = RedisChannel.RedisChannelOptions.Sharded; - kind = PushKind.SUnsubscribe; - break; - default: - kind = PushKind.None; - break; - } - if (kind != PushKind.None) - { - // the channel is always the second element - channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions); - return kind; - } + if (len < 2) + { + // for supported cases, we demand at least the kind and the subscription channel + channel = default; + return PushKind.None; + } + + const int MAX_LEN = 16; + Debug.Assert(MAX_LEN >= Enumerable.Max( + [ + PushMessage.Length, PushPMessage.Length, PushSMessage.Length, + PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length, + PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length, + ])); + ref readonly RawResult pushKind = ref result[0]; + var multiSegmentPayload = pushKind.Payload; + if (multiSegmentPayload.Length <= MAX_LEN) + { + var span = multiSegmentPayload.IsSingleSegment + ? multiSegmentPayload.First.Span + : CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload); + + var hash = FastHash.Hash64(span); + RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None; + PushKind kind; + switch (hash) + { + case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3: + kind = PushKind.Message; + break; + case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4: + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PMessage; + break; + case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3: + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SMessage; + break; + case PushSubscribe.Hash when PushSubscribe.Is(hash, span): + kind = PushKind.Subscribe; + break; + case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PSubscribe; + break; + case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SSubscribe; + break; + case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span): + kind = PushKind.Unsubscribe; + break; + case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PUnsubscribe; + break; + case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SUnsubscribe; + break; + default: + kind = PushKind.None; + break; + } + if (kind != PushKind.None) + { + // the channel is always the second element + channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions); + return kind; } } channel = default;