diff --git a/docs/Timeouts.md b/docs/Timeouts.md index 1c4ac3756..ea9830041 100644 --- a/docs/Timeouts.md +++ b/docs/Timeouts.md @@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel |qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.| |aw | Active-Writer: {bool}|| |bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted| -|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA| +|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA| |ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA| |in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis| |in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read| diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs index d1e125bb3..02baf3801 100644 --- a/src/StackExchange.Redis/CommandMap.cs +++ b/src/StackExchange.Redis/CommandMap.cs @@ -31,7 +31,7 @@ public sealed class CommandMap RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! - RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, @@ -57,7 +57,9 @@ public sealed class CommandMap RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! - RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, + + RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, RedisCommand.SCRIPT, diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index a4647d7eb..3909be4c2 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -181,6 +181,7 @@ internal enum RedisCommand SORT, SORT_RO, SPOP, + SPUBLISH, SRANDMEMBER, SREM, STRLEN, @@ -188,6 +189,8 @@ internal enum RedisCommand SUNION, SUNIONSTORE, SSCAN, + SSUBSCRIBE, + SUNSUBSCRIBE, SWAPDB, SYNC, @@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command) case RedisCommand.SMEMBERS: case RedisCommand.SMISMEMBER: case RedisCommand.SORT_RO: + case RedisCommand.SPUBLISH: case RedisCommand.SRANDMEMBER: + case RedisCommand.SSUBSCRIBE: case RedisCommand.STRLEN: case RedisCommand.SUBSCRIBE: case RedisCommand.SUNION: + case RedisCommand.SUNSUBSCRIBE: case RedisCommand.SSCAN: case RedisCommand.SYNC: case RedisCommand.TIME: diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index b89a6b946..fd75585a5 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command) case RedisCommand.SLAVEOF: case RedisCommand.SLOWLOG: case RedisCommand.SUBSCRIBE: + case RedisCommand.SPUBLISH: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.SUNSUBSCRIBE: case RedisCommand.SWAPDB: case RedisCommand.SYNC: case RedisCommand.TIME: diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 51fac7c3d..5e0dbbf60 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable private const int DefaultRedisDatabaseCount = 16; - private static readonly CommandBytes message = "message", pmessage = "pmessage"; + private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage"; private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray(); @@ -1644,9 +1644,9 @@ private void MatchResult(in RawResult result) // out of band message does not match to a queued message var items = result.GetItems(); - if (items.Length >= 3 && items[0].IsEqual(message)) + if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage))) { - _readStatus = ReadStatus.PubSubMessage; + _readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage; // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) var configChanged = muxer.ConfigurationChangedChannel; @@ -1668,8 +1668,14 @@ private void MatchResult(in RawResult result) } // invoke the handlers - var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); - Trace("MESSAGE: " + channel); + RedisChannel channel; + if (items[0].IsEqual(message)) { + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); + Trace("MESSAGE: " + channel); + } else { + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true); + Trace("SMESSAGE: " + channel); + } if (!channel.IsNull) { if (TryGetPubSubPayload(items[2], out var payload)) @@ -1690,19 +1696,19 @@ private void MatchResult(in RawResult result) { _readStatus = ReadStatus.PubSubPMessage; - var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); + var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); Trace("PMESSAGE: " + channel); if (!channel.IsNull) { if (TryGetPubSubPayload(items[3], out var payload)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payload); } else if (TryGetMultiPubSubPayload(items[3], out var payloads)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payloads); } @@ -1710,7 +1716,7 @@ private void MatchResult(in RawResult result) return; // AND STOP PROCESSING! } - // if it didn't look like "[p]message", then we still need to process the pending queue + // if it didn't look like "[p|s]message", then we still need to process the pending queue } Trace("Matching result..."); @@ -2110,6 +2116,7 @@ internal enum ReadStatus MatchResult, PubSubMessage, PubSubPMessage, + PubSubSMessage, Reconfigure, InvokePubSub, ResponseSequenceCheck, // high-integrity mode only diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 8263defd3..3b42e204e 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1309,12 +1309,15 @@ StackExchange.Redis.RedisChannel StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool StackExchange.Redis.RedisChannel.IsPattern.get -> bool +StackExchange.Redis.RedisChannel.IsSharded.get -> bool StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Pattern = 2 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.RedisChannel() -> void +StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, bool isSharded) -> void StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void +StackExchange.Redis.RedisChannel.RedisChannel(string! value, bool isSharded) -> void StackExchange.Redis.RedisChannel.RedisChannel(string! value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void StackExchange.Redis.RedisCommandException StackExchange.Redis.RedisCommandException.RedisCommandException(string! message) -> void diff --git a/src/StackExchange.Redis/RawResult.cs b/src/StackExchange.Redis/RawResult.cs index 300503f57..dd3ce9920 100644 --- a/src/StackExchange.Redis/RawResult.cs +++ b/src/StackExchange.Redis/RawResult.cs @@ -161,7 +161,7 @@ public bool MoveNext() } public ReadOnlySequence Current { get; private set; } } - internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode) + internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded) { switch (Resp2TypeBulkString) { @@ -169,12 +169,12 @@ internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.Pattern case ResultType.BulkString: if (channelPrefix == null) { - return new RedisChannel(GetBlob(), mode); + return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode); } if (StartsWith(channelPrefix)) { byte[] copy = Payload.Slice(channelPrefix.Length).ToArray(); - return new RedisChannel(copy, mode); + return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode); } return default; default: diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index 561cce21f..9c0cadbf4 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -10,6 +10,7 @@ namespace StackExchange.Redis { internal readonly byte[]? Value; internal readonly bool _isPatternBased; + internal readonly bool _isSharded; /// /// Indicates whether the channel-name is either null or a zero-length value. @@ -21,6 +22,11 @@ namespace StackExchange.Redis /// public bool IsPattern => _isPatternBased; + /// + /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) + /// + public bool IsSharded => _isSharded; + internal bool IsNull => Value == null; /// @@ -59,7 +65,7 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { } + public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { } /// /// Create a new redis channel from a string, explicitly controlling the pattern mode. @@ -68,10 +74,25 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt /// The mode for name matching. public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { } - private RedisChannel(byte[]? value, bool isPatternBased) + /// + /// Create a new redis channel from a buffer, explicitly controlling the sharding mode. + /// + /// The name of the channel to create. + /// Whether the channel is sharded. + public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {} + + /// + /// Create a new redis channel from a string, explicitly controlling the sharding mode. + /// + /// The string name of the channel to create. + /// Whether the channel is sharded. + public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {} + + private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) { Value = value; _isPatternBased = isPatternBased; + _isSharded = isSharded; } private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch @@ -123,7 +144,7 @@ private RedisChannel(byte[]? value, bool isPatternBased) /// The first to compare. /// The second to compare. public static bool operator ==(RedisChannel x, RedisChannel y) => - x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value); + x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded; /// /// Indicate whether two channel names are equal. @@ -171,10 +192,10 @@ private RedisChannel(byte[]? value, bool isPatternBased) /// Indicate whether two channel names are equal. /// /// The to compare to. - public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value); + public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded; /// - public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0); + public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0); /// /// Obtains a string representation of the channel name. @@ -286,4 +307,4 @@ public static implicit operator RedisChannel(byte[]? key) private RedisChannel(byte[]? value) => throw new NotSupportedException(); #endif } -} +} \ No newline at end of file diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index ee28f4c56..2b2076e03 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -183,13 +183,17 @@ public Subscription(CommandFlags flags) internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) { var isPattern = channel._isPatternBased; + var isSharded = channel._isSharded; var command = action switch { SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE, SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE, - SubscriptionAction.Subscribe when !isPattern => RedisCommand.SUBSCRIBE, - SubscriptionAction.Unsubscribe when !isPattern => RedisCommand.UNSUBSCRIBE, + SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE, + SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE, + + SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE, + SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE, _ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"), }; @@ -370,14 +374,14 @@ private static void ThrowIfNull(in RedisChannel channel) public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); return ExecuteSync(msg, ResultProcessor.Int64); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); return ExecuteAsync(msg, ResultProcessor.Int64); } @@ -515,6 +519,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action connection.BridgeCouldBeNull?.ServerEndPoint, - _ => null, + RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint, + _ => null }; Subscription?.SetCurrentServer(newServer); return true; @@ -1526,7 +1526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { case ResultType.Array: var final = result.ToArray( - (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode), + (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false), new ChannelState(connection.ChannelPrefix, mode))!; SetResult(message, final); diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index 8b099afd2..c9d8414ba 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -260,6 +260,8 @@ public void Dispose() case RedisCommand.UNSUBSCRIBE: case RedisCommand.PSUBSCRIBE: case RedisCommand.PUNSUBSCRIBE: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.SUNSUBSCRIBE: message.SetForSubscriptionBridge(); break; } @@ -278,6 +280,8 @@ public void Dispose() case RedisCommand.UNSUBSCRIBE: case RedisCommand.PSUBSCRIBE: case RedisCommand.PUNSUBSCRIBE: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.SUNSUBSCRIBE: if (!KnowOrAssumeResp3()) { return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription, null) : null); diff --git a/toys/StackExchange.Redis.Server/RedisRequest.cs b/toys/StackExchange.Redis.Server/RedisRequest.cs index 54102815c..283076905 100644 --- a/toys/StackExchange.Redis.Server/RedisRequest.cs +++ b/toys/StackExchange.Redis.Server/RedisRequest.cs @@ -46,7 +46,7 @@ public int GetInt32(int index) public RedisKey GetKey(int index) => _inner[index].AsRedisKey(); public RedisChannel GetChannel(int index, RedisChannel.PatternMode mode) - => _inner[index].AsRedisChannel(null, mode); + => _inner[index].AsRedisChannel(null, mode, false); internal bool TryGetCommandBytes(int i, out CommandBytes command) {