-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Support sharded pubsub commands #2498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6c36478
248c182
90e2493
ac44789
fbc8fd5
2daf168
f81d025
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -183,13 +183,17 @@ public Subscription(CommandFlags flags) | |
| internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) | ||
| { | ||
| var isPattern = channel._isPatternBased; | ||
| var isSharded = channel._isSharded; | ||
| var command = action switch | ||
| { | ||
| SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE, | ||
| SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE, | ||
|
|
||
| SubscriptionAction.Subscribe when !isPattern => RedisCommand.SUBSCRIBE, | ||
| SubscriptionAction.Unsubscribe when !isPattern => RedisCommand.UNSUBSCRIBE, | ||
| SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE, | ||
| SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE, | ||
|
|
||
| SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE, | ||
| SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE, | ||
| _ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"), | ||
| }; | ||
|
|
||
|
|
@@ -370,14 +374,14 @@ private static void ThrowIfNull(in RedisChannel channel) | |
| public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) | ||
| { | ||
| ThrowIfNull(channel); | ||
| var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); | ||
| var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); | ||
| return ExecuteSync(msg, ResultProcessor.Int64); | ||
| } | ||
|
|
||
| public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) | ||
| { | ||
| ThrowIfNull(channel); | ||
| var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); | ||
| var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); | ||
| return ExecuteAsync(msg, ResultProcessor.Int64); | ||
| } | ||
|
|
||
|
|
@@ -515,6 +519,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action<RedisChannel | |
| return false; | ||
| } | ||
|
|
||
| // TODO: We need a new api to support SUNSUBSCRIBE all. Calling this now would unsubscribe both sharded and unsharded channels. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we actually need such an API?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. currently UnsubscribeAll will unsubscribe both unsharded and sharded channels. Alternatively, we need to pass some flag to indicate whether we want to unsubscribe all sharded channels to match the SUNSUBSCRIBE semantics.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that. I just don't know that it is obvious whether that represents a problem. What genuinely likely scenario are we thinking of where this nuance is needed? |
||
| public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) | ||
| { | ||
| // TODO: Unsubscribe variadic commands to reduce round trips | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -469,8 +469,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes | |
|
|
||
| var newServer = message.Command switch | ||
| { | ||
| RedisCommand.SUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint, | ||
| _ => null, | ||
| RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint, | ||
| _ => null | ||
| }; | ||
| Subscription?.SetCurrentServer(newServer); | ||
| return true; | ||
|
|
@@ -1526,7 +1526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes | |
| { | ||
| case ResultType.Array: | ||
| var final = result.ToArray( | ||
| (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode), | ||
| (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to support
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, plan to add it in a separate PR.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ssub handled in my extensions to this PR |
||
| new ChannelState(connection.ChannelPrefix, mode))!; | ||
|
|
||
| SetResult(message, final); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like we're testing this a lot; can we hoist this to a
boolsomewhere? maybe:then just use
isShardedin all the places? for examplechannel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded);or alternatively, using the
PatternModeapproach, maybe we canout var patternMode?