Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Timeouts.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel
|qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.|
|aw | Active-Writer: {bool}||
|bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted|
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA|
|in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis|
|in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read|
Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/CommandMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public sealed class CommandMap

RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!

RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,

RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,

Expand All @@ -57,7 +57,9 @@ public sealed class CommandMap

RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!

RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,

RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,

RedisCommand.SCRIPT,

Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,16 @@ internal enum RedisCommand
SORT,
SORT_RO,
SPOP,
SPUBLISH,
SRANDMEMBER,
SREM,
STRLEN,
SUBSCRIBE,
SUNION,
SUNIONSTORE,
SSCAN,
SSUBSCRIBE,
SUNSUBSCRIBE,
SWAPDB,
SYNC,

Expand Down Expand Up @@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.SMEMBERS:
case RedisCommand.SMISMEMBER:
case RedisCommand.SORT_RO:
case RedisCommand.SPUBLISH:
case RedisCommand.SRANDMEMBER:
case RedisCommand.SSUBSCRIBE:
case RedisCommand.STRLEN:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SUNION:
case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SSCAN:
case RedisCommand.SYNC:
case RedisCommand.TIME:
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command)
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SPUBLISH:
case RedisCommand.SSUBSCRIBE:
case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SWAPDB:
case RedisCommand.SYNC:
case RedisCommand.TIME:
Expand Down
25 changes: 16 additions & 9 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable

private const int DefaultRedisDatabaseCount = 16;

private static readonly CommandBytes message = "message", pmessage = "pmessage";
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";

private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
Expand Down Expand Up @@ -1644,9 +1644,9 @@ private void MatchResult(in RawResult result)

// out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message))
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
{
_readStatus = ReadStatus.PubSubMessage;
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;

// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
Expand All @@ -1668,8 +1668,14 @@ private void MatchResult(in RawResult result)
}

// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Trace("MESSAGE: " + channel);
RedisChannel channel;
if (items[0].IsEqual(message)) {
Copy link
Collaborator

@mgravell mgravell Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like we're testing this a lot; can we hoist this to a bool somewhere? maybe:

if (items.Length >= 3 && IsSimpleMessage(items[0], out bool isSharded))

then just use isSharded in all the places? for example channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded);

or alternatively, using the PatternMode approach, maybe we can out var patternMode ?

channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
Trace("MESSAGE: " + channel);
} else {
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true);
Trace("SMESSAGE: " + channel);
}
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[2], out var payload))
Expand All @@ -1690,27 +1696,27 @@ private void MatchResult(in RawResult result)
{
_readStatus = ReadStatus.PubSubPMessage;

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
}
}
return; // AND STOP PROCESSING!
}

// if it didn't look like "[p]message", then we still need to process the pending queue
// if it didn't look like "[p|s]message", then we still need to process the pending queue
}
Trace("Matching result...");

Expand Down Expand Up @@ -2110,6 +2116,7 @@ internal enum ReadStatus
MatchResult,
PubSubMessage,
PubSubPMessage,
PubSubSMessage,
Reconfigure,
InvokePubSub,
ResponseSequenceCheck, // high-integrity mode only
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1309,12 +1309,15 @@ StackExchange.Redis.RedisChannel
StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool
StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool
StackExchange.Redis.RedisChannel.IsPattern.get -> bool
StackExchange.Redis.RedisChannel.IsSharded.get -> bool
StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Pattern = 2 -> StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.RedisChannel() -> void
StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, bool isSharded) -> void
StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void
StackExchange.Redis.RedisChannel.RedisChannel(string! value, bool isSharded) -> void
StackExchange.Redis.RedisChannel.RedisChannel(string! value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void
StackExchange.Redis.RedisCommandException
StackExchange.Redis.RedisCommandException.RedisCommandException(string! message) -> void
Expand Down
6 changes: 3 additions & 3 deletions src/StackExchange.Redis/RawResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,20 @@ public bool MoveNext()
}
public ReadOnlySequence<byte> Current { get; private set; }
}
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode)
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded)
{
switch (Resp2TypeBulkString)
{
case ResultType.SimpleString:
case ResultType.BulkString:
if (channelPrefix == null)
{
return new RedisChannel(GetBlob(), mode);
return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode);
}
if (StartsWith(channelPrefix))
{
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
return new RedisChannel(copy, mode);
return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode);
}
return default;
default:
Expand Down
33 changes: 27 additions & 6 deletions src/StackExchange.Redis/RedisChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace StackExchange.Redis
{
internal readonly byte[]? Value;
internal readonly bool _isPatternBased;
internal readonly bool _isSharded;

/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value.
Expand All @@ -21,6 +22,11 @@ namespace StackExchange.Redis
/// </summary>
public bool IsPattern => _isPatternBased;

/// <summary>
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>)
/// </summary>
public bool IsSharded => _isSharded;

internal bool IsNull => Value == null;

/// <summary>
Expand Down Expand Up @@ -59,7 +65,7 @@ public static bool UseImplicitAutoPattern
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { }
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { }

/// <summary>
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
Expand All @@ -68,10 +74,25 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }

private RedisChannel(byte[]? value, bool isPatternBased)
/// <summary>
/// Create a new redis channel from a buffer, explicitly controlling the sharding mode.
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <param name="isSharded">Whether the channel is sharded.</param>
public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {}

/// <summary>
/// Create a new redis channel from a string, explicitly controlling the sharding mode.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="isSharded">Whether the channel is sharded.</param>
public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {}

private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded)
{
Value = value;
_isPatternBased = isPatternBased;
_isSharded = isSharded;
}

private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
Expand Down Expand Up @@ -123,7 +144,7 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, RedisChannel y) =>
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value);
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded;

/// <summary>
/// Indicate whether two channel names are equal.
Expand Down Expand Up @@ -171,10 +192,10 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value);
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded;

/// <inheritdoc/>
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0);
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0);

/// <summary>
/// Obtains a string representation of the channel name.
Expand Down Expand Up @@ -286,4 +307,4 @@ public static implicit operator RedisChannel(byte[]? key)
private RedisChannel(byte[]? value) => throw new NotSupportedException();
#endif
}
}
}
13 changes: 9 additions & 4 deletions src/StackExchange.Redis/RedisSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,17 @@ public Subscription(CommandFlags flags)
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
{
var isPattern = channel._isPatternBased;
var isSharded = channel._isSharded;
var command = action switch
{
SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE,
SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE,

SubscriptionAction.Subscribe when !isPattern => RedisCommand.SUBSCRIBE,
SubscriptionAction.Unsubscribe when !isPattern => RedisCommand.UNSUBSCRIBE,
SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE,
SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE,

SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE,
SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE,
_ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"),
};

Expand Down Expand Up @@ -370,14 +374,14 @@ private static void ThrowIfNull(in RedisChannel channel)
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
}

Expand Down Expand Up @@ -515,6 +519,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action<RedisChannel
return false;
}

// TODO: We need a new api to support SUNSUBSCRIBE all. Calling this now would unsubscribe both sharded and unsharded channels.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need such an API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently UnsubscribeAll will unsubscribe both unsharded and sharded channels. Alternatively, we need to pass some flag to indicate whether we want to unsubscribe all sharded channels to match the SUNSUBSCRIBE semantics.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that. I just don't know that it is obvious whether that represents a problem. What genuinely likely scenario are we thinking of where this nuance is needed?

public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
{
// TODO: Unsubscribe variadic commands to reduce round trips
Expand Down
6 changes: 3 additions & 3 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes

var newServer = message.Command switch
{
RedisCommand.SUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
_ => null,
RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
_ => null
};
Subscription?.SetCurrentServer(newServer);
return true;
Expand Down Expand Up @@ -1526,7 +1526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
case ResultType.Array:
var final = result.ToArray(
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode),
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to support PUBSUB SHARDCHANNELS (see SubscriptionChannels[Async]) ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto SHARDNUMSUB - see SubscriptionPatternCount / SubscriptionSubscriberCount

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto ssub in CLIENT - see ClientInfo.TryParse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, plan to add it in a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ssub handled in my extensions to this PR

new ChannelState(connection.ChannelPrefix, mode))!;

SetResult(message, final);
Expand Down
Loading
Loading