diff --git a/StackExchange.Redis.sln b/StackExchange.Redis.sln index 6e4416d7d..20b5e2f01 100644 --- a/StackExchange.Redis.sln +++ b/StackExchange.Redis.sln @@ -19,6 +19,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution docs\ReleaseNotes.md = docs\ReleaseNotes.md Shared.ruleset = Shared.ruleset version.json = version.json + tests\RedisConfigs\docker-compose.yml = tests\RedisConfigs\docker-compose.yml EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RedisConfigs", "RedisConfigs", "{96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}" diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 6754ea017..b301cca8b 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -16,6 +16,8 @@ Current package versions: - Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906)) - Docs: added [guidance on async timeouts](https://stackexchange.github.io/StackExchange.Redis/AsyncTimeouts) ([#2910 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2910)) - Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909)) +- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842)) +- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912)) ## 2.8.41 diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index 3909be4c2..34e1eb296 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -206,10 +206,12 @@ internal enum RedisCommand WATCH, XACK, + XACKDEL, XADD, XAUTOCLAIM, XCLAIM, XDEL, + XDELEX, XGROUP, XINFO, XLEN, @@ -496,9 +498,11 @@ internal static bool IsPrimaryOnly(this RedisCommand command) case RedisCommand.GEOADD: case RedisCommand.SORT: case RedisCommand.XACK: + case RedisCommand.XACKDEL: case RedisCommand.XADD: case RedisCommand.XCLAIM: case RedisCommand.XDEL: + case RedisCommand.XDELEX: case RedisCommand.XGROUP: case RedisCommand.XREADGROUP: case RedisCommand.XTRIM: diff --git a/src/StackExchange.Redis/Enums/StreamTrimMode.cs b/src/StackExchange.Redis/Enums/StreamTrimMode.cs new file mode 100644 index 000000000..2033e8414 --- /dev/null +++ b/src/StackExchange.Redis/Enums/StreamTrimMode.cs @@ -0,0 +1,24 @@ +namespace StackExchange.Redis; + +/// +/// Determines how stream trimming works. +/// +public enum StreamTrimMode +{ + /// + /// Trims the stream according to the specified policy (MAXLEN or MINID) regardless of whether entries are referenced by any consumer groups, but preserves existing references to these entries in all consumer groups' PEL. + /// + KeepReferences = 0, + + /// + /// Trims the stream according to the specified policy and also removes all references to the trimmed entries from all consumer groups' PEL. + /// + /// Requires server 8.2 or above. + DeleteReferences = 1, + + /// + /// With ACKED: Only trims entries that were read and acknowledged by all consumer groups. + /// + /// Requires server 8.2 or above. + Acknowledged = 2, +} diff --git a/src/StackExchange.Redis/Enums/StreamTrimResult.cs b/src/StackExchange.Redis/Enums/StreamTrimResult.cs new file mode 100644 index 000000000..aa157a8a0 --- /dev/null +++ b/src/StackExchange.Redis/Enums/StreamTrimResult.cs @@ -0,0 +1,23 @@ +namespace StackExchange.Redis; + +/// +/// Determines how stream trimming works. +/// +public enum StreamTrimResult +{ + /// + /// No such id exists in the provided stream key. + /// + NotFound = -1, + + /// + /// Entry was deleted from the stream. + /// + Deleted = 1, + + /// + /// Entry was not deleted, but there are still dangling references. + /// + /// This response relates to the mode. + NotDeleted = 2, +} diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 207c03326..c37d3ddb0 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2440,6 +2440,34 @@ IEnumerable SortedSetScan( /// long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + /// + /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// The delete mode to use when acknowledging the message. + /// The ID of the message to acknowledge. + /// The flags to use for this operation. + /// The outcome of the delete operation. + /// +#pragma warning disable RS0026 // similar overloads + StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 + + /// + /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. + /// + /// The key of the stream. + /// The name of the consumer group that received the message. + /// /// The delete mode to use when acknowledging the message. + /// The IDs of the messages to acknowledge. + /// The flags to use for this operation. + /// The outcome of each delete operation. + /// +#pragma warning disable RS0026 // similar overloads + StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 + /// /// Adds an entry using the specified values to the given stream key. /// If key does not exist, a new key holding a stream is created. @@ -2454,7 +2482,7 @@ IEnumerable SortedSetScan( /// The flags to use for this operation. /// The ID of the newly created message. /// - RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); /// /// Adds an entry using the specified values to the given stream key. @@ -2469,7 +2497,46 @@ IEnumerable SortedSetScan( /// The flags to use for this operation. /// The ID of the newly created message. /// - RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); + + /// + /// Adds an entry using the specified values to the given stream key. + /// If key does not exist, a new key holding a stream is created. + /// The command returns the ID of the newly created stream entry. + /// + /// The key of the stream. + /// The field name for the stream entry. + /// The value to set in the stream entry. + /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// Specifies the maximal count of entries that will be evicted. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// +#pragma warning disable RS0026 // different shape + RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 + + /// + /// Adds an entry using the specified values to the given stream key. + /// If key does not exist, a new key holding a stream is created. + /// The command returns the ID of the newly created stream entry. + /// + /// The key of the stream. + /// The fields and their associated values to set in the stream entry. + /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// Specifies the maximal count of entries that will be evicted. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// The ID of the newly created message. + /// +#pragma warning disable RS0026 // different shape + RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 /// /// Change ownership of messages consumed, but not yet acknowledged, by a different consumer. @@ -2583,7 +2650,22 @@ IEnumerable SortedSetScan( /// The flags to use for this operation. /// Returns the number of messages successfully deleted from the stream. /// +#pragma warning disable RS0026 // similar overloads long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 + + /// + /// Delete messages in the stream. This method does not delete the stream. + /// + /// The key of the stream. + /// The IDs of the messages to delete. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// Returns the number of messages successfully deleted from the stream. + /// +#pragma warning disable RS0026 // similar overloads + StreamTrimResult[] StreamDelete(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 /// /// Delete a consumer from a consumer group. @@ -2773,7 +2855,33 @@ IEnumerable SortedSetScan( /// The flags to use for this operation. /// The number of messages removed from the stream. /// - long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags); + + /// + /// Trim the stream to a specified maximum length. + /// + /// The key of the stream. + /// The maximum length of the stream. + /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// Specifies the maximal count of entries that will be evicted. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// The number of messages removed from the stream. + /// + long StreamTrim(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + + /// + /// Trim the stream to a specified minimum timestamp. + /// + /// The key of the stream. + /// All entries with an id (timestamp) earlier minId will be removed. + /// If true, the "~" argument is used to allow the stream to exceed minId by a small number. This improves performance when removing messages. + /// The maximum number of entries to remove per call when useApproximateMaxLength = true. If 0, the limiting mechanism is disabled entirely. + /// Determines how stream trimming should be performed. + /// The flags to use for this operation. + /// The number of messages removed from the stream. + /// + long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); /// /// If key already exists and is a string, this command appends the value at the end of the string. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 9852c131c..4873c1069 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -594,11 +594,27 @@ IAsyncEnumerable SortedSetScanAsync( /// Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); +#pragma warning disable RS0026 // similar overloads + /// + Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None); + + /// + Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 + /// - Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); /// - Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); + +#pragma warning disable RS0026 // similar overloads + /// + Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + + /// + Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 /// Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None); @@ -624,9 +640,14 @@ IAsyncEnumerable SortedSetScanAsync( /// Task StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupName, RedisValue? position = null, bool createStream = true, CommandFlags flags = CommandFlags.None); +#pragma warning disable RS0026 /// Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None); + /// + Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None); +#pragma warning restore RS0026 + /// Task StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None); @@ -670,7 +691,13 @@ IAsyncEnumerable SortedSetScanAsync( Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None); /// - Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags); + + /// + Task StreamTrimAsync(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + + /// + Task StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); /// Task StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index f45c29886..331d23ea7 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -564,12 +564,24 @@ public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, Red public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageIds, flags); - public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => + public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => + Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageId, flags); + + public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => + Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags); + + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); - public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); @@ -606,6 +618,9 @@ public Task StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFl public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamDeleteAsync(ToInner(key), messageIds, flags); + public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None) => + Inner.StreamDeleteAsync(ToInner(key), messageIds, mode, flags); + public Task StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None) => Inner.StreamDeleteConsumerAsync(ToInner(key), groupName, consumerName, flags); @@ -639,9 +654,15 @@ public Task StreamReadGroupAsync(StreamPosition[] streamPositions public Task StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) => Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags); - public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => + public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags); + public Task StreamTrimAsync(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, limit, mode, flags); + + public Task StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamTrimByMinIdAsync(ToInner(key), minId, useApproximateMaxLength, limit, mode, flags); + public Task StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) => Inner.StringAppendAsync(ToInner(key), value, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index a19dd0b7a..18406ba9f 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -546,12 +546,24 @@ public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue mes public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamAcknowledge(ToInner(key), groupName, messageIds, flags); - public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => + public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) => + Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageId, flags); + + public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => + Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags); + + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags); - public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); @@ -588,6 +600,9 @@ public long StreamLength(RedisKey key, CommandFlags flags = CommandFlags.None) = public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) => Inner.StreamDelete(ToInner(key), messageIds, flags); + public StreamTrimResult[] StreamDelete(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None) => + Inner.StreamDelete(ToInner(key), messageIds, mode, flags); + public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None) => Inner.StreamDeleteConsumer(ToInner(key), groupName, consumerName, flags); @@ -621,9 +636,15 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) => Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags); - public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => + public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags); + public long StreamTrim(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, limit, mode, flags); + + public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamTrimByMinId(ToInner(key), minId, useApproximateMaxLength, limit, mode, flags); + public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) => Inner.StringAppend(ToInner(key), value, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 00ae49025..43b35ba58 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -717,8 +717,12 @@ StackExchange.Redis.IDatabase.SortedSetUpdate(StackExchange.Redis.RedisKey key, StackExchange.Redis.IDatabase.SortedSetUpdate(StackExchange.Redis.RedisKey key, StackExchange.Redis.SortedSetEntry[]! values, StackExchange.Redis.SortedSetWhen when = StackExchange.Redis.SortedSetWhen.Always, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long -StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue -StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAcknowledgeAndDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamTrimResult +StackExchange.Redis.IDatabase.StreamAcknowledgeAndDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamTrimResult[]! +StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult StackExchange.Redis.IDatabase.StreamAutoClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult StackExchange.Redis.IDatabase.StreamClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! @@ -728,6 +732,7 @@ StackExchange.Redis.IDatabase.StreamConsumerInfo(StackExchange.Redis.RedisKey ke StackExchange.Redis.IDatabase.StreamCreateConsumerGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue? position = null, bool createStream = true, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool StackExchange.Redis.IDatabase.StreamCreateConsumerGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue? position, StackExchange.Redis.CommandFlags flags) -> bool StackExchange.Redis.IDatabase.StreamDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +StackExchange.Redis.IDatabase.StreamDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamTrimResult[]! StackExchange.Redis.IDatabase.StreamDeleteConsumer(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamDeleteConsumerGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamGroupInfo[]! @@ -742,7 +747,9 @@ StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]! StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]! -StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> long +StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +StackExchange.Redis.IDatabase.StreamTrimByMinId(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StringAppend(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue value, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StringBitCount(StackExchange.Redis.RedisKey key, long start, long end, StackExchange.Redis.CommandFlags flags) -> long StackExchange.Redis.IDatabase.StringBitCount(StackExchange.Redis.RedisKey key, long start = 0, long end = -1, StackExchange.Redis.StringIndexType indexType = StackExchange.Redis.StringIndexType.Byte, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long @@ -954,8 +961,12 @@ StackExchange.Redis.IDatabaseAsync.SortedSetUpdateAsync(StackExchange.Redis.Redi StackExchange.Redis.IDatabaseAsync.SortedSetUpdateAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.SortedSetEntry[]! values, StackExchange.Redis.SortedSetWhen when = StackExchange.Redis.SortedSetWhen.Always, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAndDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAndDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAutoClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -965,6 +976,7 @@ StackExchange.Redis.IDatabaseAsync.StreamConsumerInfoAsync(StackExchange.Redis.R StackExchange.Redis.IDatabaseAsync.StreamCreateConsumerGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue? position = null, bool createStream = true, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamCreateConsumerGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue? position, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamDeleteConsumerAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamDeleteConsumerGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -979,7 +991,9 @@ StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.Redi StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamTrimByMinIdAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StringAppendAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue value, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StringBitCountAsync(StackExchange.Redis.RedisKey key, long start, long end, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StringBitCountAsync(StackExchange.Redis.RedisKey key, long start = 0, long end = -1, StackExchange.Redis.StringIndexType indexType = StackExchange.Redis.StringIndexType.Byte, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! @@ -1902,4 +1916,12 @@ StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCerti StackExchange.Redis.Bitwise.AndOr = 6 -> StackExchange.Redis.Bitwise StackExchange.Redis.Bitwise.Diff = 4 -> StackExchange.Redis.Bitwise StackExchange.Redis.Bitwise.Diff1 = 5 -> StackExchange.Redis.Bitwise -StackExchange.Redis.Bitwise.One = 7 -> StackExchange.Redis.Bitwise \ No newline at end of file +StackExchange.Redis.Bitwise.One = 7 -> StackExchange.Redis.Bitwise +StackExchange.Redis.StreamTrimMode +StackExchange.Redis.StreamTrimMode.Acknowledged = 2 -> StackExchange.Redis.StreamTrimMode +StackExchange.Redis.StreamTrimMode.DeleteReferences = 1 -> StackExchange.Redis.StreamTrimMode +StackExchange.Redis.StreamTrimMode.KeepReferences = 0 -> StackExchange.Redis.StreamTrimMode +StackExchange.Redis.StreamTrimResult +StackExchange.Redis.StreamTrimResult.Deleted = 1 -> StackExchange.Redis.StreamTrimResult +StackExchange.Redis.StreamTrimResult.NotDeleted = 2 -> StackExchange.Redis.StreamTrimResult +StackExchange.Redis.StreamTrimResult.NotFound = -1 -> StackExchange.Redis.StreamTrimResult \ No newline at end of file diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 716176662..5493f3ebd 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.Diagnostics; using System.Net; using System.Threading.Tasks; using Pipelines.Sockets.Unofficial.Arenas; @@ -2423,7 +2424,34 @@ public Task StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, Red return ExecuteAsync(msg, ResultProcessor.Int64); } - public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + public StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeAndDeleteMessage(key, groupName, mode, messageId, flags); + return ExecuteSync(msg, ResultProcessor.StreamTrimResult); + } + + public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeAndDeleteMessage(key, groupName, mode, messageId, flags); + return ExecuteAsync(msg, ResultProcessor.StreamTrimResult); + } + + public StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeAndDeleteMessage(key, groupName, mode, messageIds, flags); + return ExecuteSync(msg, ResultProcessor.StreamTrimResultArray)!; + } + + public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamAcknowledgeAndDeleteMessage(key, groupName, mode, messageIds, flags); + return ExecuteAsync(msg, ResultProcessor.StreamTrimResultArray)!; + } + + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) + => StreamAdd(key, streamField, streamValue, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2431,12 +2459,17 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str maxLength, useApproximateMaxLength, new NameValueEntry(streamField, streamValue), + limit, + mode, flags); return ExecuteSync(msg, ResultProcessor.RedisValue); } - public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) + => StreamAddAsync(key, streamField, streamValue, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2444,12 +2477,17 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red maxLength, useApproximateMaxLength, new NameValueEntry(streamField, streamValue), + limit, + mode, flags); return ExecuteAsync(msg, ResultProcessor.RedisValue); } - public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) + => StreamAdd(key, streamPairs, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2457,12 +2495,17 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal maxLength, useApproximateMaxLength, streamPairs, + limit, + mode, flags); return ExecuteSync(msg, ResultProcessor.RedisValue); } - public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) + => StreamAddAsync(key, streamPairs, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2470,6 +2513,8 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair maxLength, useApproximateMaxLength, streamPairs, + limit, + mode, flags); return ExecuteAsync(msg, ResultProcessor.RedisValue); @@ -2703,28 +2748,50 @@ public Task StreamLengthAsync(RedisKey key, CommandFlags flags = CommandFl public long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( - Database, - flags, - RedisCommand.XDEL, - key, - messageIds); - + var msg = Message.Create(Database, flags, RedisCommand.XDEL, key, messageIds); return ExecuteSync(msg, ResultProcessor.Int64); } - public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + public StreamTrimResult[] StreamDelete(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags) { - var msg = Message.Create( - Database, - flags, - RedisCommand.XDEL, - key, - messageIds); + var msg = GetStreamDeleteExMessage(key, messageIds, mode, flags); + return ExecuteSync(msg, ResultProcessor.StreamTrimResultArray)!; + } + private Message GetStreamDeleteExMessage(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags) + { + if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); + if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); + + // avoid array for single message case + if (messageIds.Length == 1) + { + return Message.Create(Database, flags, RedisCommand.XDELEX, key, StreamConstants.GetMode(mode), StreamConstants.Ids, 1, messageIds[0]); + } + + var values = new RedisValue[messageIds.Length + 3]; + + var offset = 0; + values[offset++] = StreamConstants.GetMode(mode); + values[offset++] = StreamConstants.Ids; + values[offset++] = messageIds.Length; + messageIds.AsSpan().CopyTo(values.AsSpan(offset)); + Debug.Assert(offset + messageIds.Length == values.Length); + return Message.Create(Database, flags, RedisCommand.XDELEX, key, values); + } + + public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) + { + var msg = Message.Create(Database, flags, RedisCommand.XDEL, key, messageIds); return ExecuteAsync(msg, ResultProcessor.Int64); } + public Task StreamDeleteAsync(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamDeleteExMessage(key, messageIds, mode, flags); + return ExecuteAsync(msg, ResultProcessor.StreamTrimResultArray)!; + } + public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None) { var msg = Message.Create( @@ -2995,15 +3062,33 @@ public Task StreamReadGroupAsync(StreamPosition[] streamPositions return ExecuteAsync(msg, ResultProcessor.MultiStream, defaultValue: Array.Empty()); } - public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) + => StreamTrim(key, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + + public long StreamTrim(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamTrimMessage(true, key, maxLength, useApproximateMaxLength, limit, mode, flags); + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) + => StreamTrimAsync(key, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + + public Task StreamTrimAsync(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamTrimMessage(true, key, maxLength, useApproximateMaxLength, limit, mode, flags); + return ExecuteAsync(msg, ResultProcessor.Int64); + } + + public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { - var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags); + var msg = GetStreamTrimMessage(false, key, minId, useApproximateMaxLength, limit, mode, flags); return ExecuteSync(msg, ResultProcessor.Int64); } - public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) + public Task StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { - var msg = GetStreamTrimMessage(key, maxLength, useApproximateMaxLength, flags); + var msg = GetStreamTrimMessage(false, key, minId, useApproximateMaxLength, limit, mode, flags); return ExecuteAsync(msg, ResultProcessor.Int64); } @@ -4109,13 +4194,7 @@ private Message GetSortedSetRemoveRangeByScoreMessage(RedisKey key, double start private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags) { - var values = new RedisValue[] - { - groupName, - messageId, - }; - - return Message.Create(Database, flags, RedisCommand.XACK, key, values); + return Message.Create(Database, flags, RedisCommand.XACK, key, groupName, messageId); } private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags) @@ -4124,27 +4203,45 @@ private Message GetStreamAcknowledgeMessage(RedisKey key, RedisValue groupName, if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); var values = new RedisValue[messageIds.Length + 1]; + values[0] = groupName; + messageIds.AsSpan().CopyTo(values.AsSpan(1)); - var offset = 0; + return Message.Create(Database, flags, RedisCommand.XACK, key, values); + } - values[offset++] = groupName; + private Message GetStreamAcknowledgeAndDeleteMessage(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags) + { + return Message.Create(Database, flags, RedisCommand.XACKDEL, key, groupName, StreamConstants.GetMode(mode), StreamConstants.Ids, 1, messageId); + } - for (var i = 0; i < messageIds.Length; i++) - { - values[offset++] = messageIds[i]; - } + private Message GetStreamAcknowledgeAndDeleteMessage(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags) + { + if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); + if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); - return Message.Create(Database, flags, RedisCommand.XACK, key, values); + var values = new RedisValue[messageIds.Length + 4]; + + var offset = 0; + values[offset++] = groupName; + values[offset++] = StreamConstants.GetMode(mode); + values[offset++] = StreamConstants.Ids; + values[offset++] = messageIds.Length; + messageIds.AsSpan().CopyTo(values.AsSpan(offset)); + Debug.Assert(offset + messageIds.Length == values.Length); + + return Message.Create(Database, flags, RedisCommand.XACKDEL, key, values); } - private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, CommandFlags flags) + private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, long? limit, StreamTrimMode mode, CommandFlags flags) { // Calculate the correct number of arguments: // 3 array elements for Entry ID & NameValueEntry.Name & NameValueEntry.Value. // 2 elements if using MAXLEN (keyword & value), otherwise 0. // 1 element if using Approximate Length (~), otherwise 0. var totalLength = 3 + (maxLength.HasValue ? 2 : 0) - + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0); + + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0) + + (limit.HasValue ? 2 : 0) + + (mode != StreamTrimMode.KeepReferences ? 1 : 0); var values = new RedisValue[totalLength]; var offset = 0; @@ -4156,26 +4253,35 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, int? max if (useApproximateMaxLength) { values[offset++] = StreamConstants.ApproximateMaxLen; - values[offset++] = maxLength.Value; - } - else - { - values[offset++] = maxLength.Value; } + + values[offset++] = maxLength.Value; + } + + if (limit.HasValue) + { + values[offset++] = RedisLiterals.LIMIT; + values[offset++] = limit.Value; + } + + if (mode != StreamTrimMode.KeepReferences) + { + values[offset++] = StreamConstants.GetMode(mode); } values[offset++] = messageId; values[offset++] = streamPair.Name; - values[offset] = streamPair.Value; + values[offset++] = streamPair.Value; + Debug.Assert(offset == totalLength); return Message.Create(Database, flags, RedisCommand.XADD, key, values); } /// /// Gets message for . /// - private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, CommandFlags flags) + private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, long? limit, StreamTrimMode mode, CommandFlags flags) { if (streamPairs == null) throw new ArgumentNullException(nameof(streamPairs)); if (streamPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPairs), "streamPairs must contain at least one item."); @@ -4209,6 +4315,17 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe values[offset++] = maxLength.Value; } + if (limit.HasValue) + { + values[offset++] = RedisLiterals.LIMIT; + values[offset++] = limit.Value; + } + + if (mode != StreamTrimMode.KeepReferences) + { + values[offset++] = StreamConstants.GetMode(mode); + } + values[offset++] = entryId; for (var i = 0; i < streamPairs.Length; i++) @@ -4217,6 +4334,7 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, int? maxLe values[offset++] = streamPairs[i].Value; } + Debug.Assert(offset == totalLength); return Message.Create(Database, flags, RedisCommand.XADD, key, values); } @@ -4465,27 +4583,45 @@ protected override void WriteImpl(PhysicalConnection physical) public override int ArgCount => argCount; } - private Message GetStreamTrimMessage(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) + private Message GetStreamTrimMessage(bool maxLen, RedisKey key, RedisValue threshold, bool useApproximateMaxLength, long? limit, StreamTrimMode mode, CommandFlags flags) { - if (maxLength < 0) + if (limit.HasValue && limit.GetValueOrDefault() <= 0) { - throw new ArgumentOutOfRangeException(nameof(maxLength), "maxLength must be equal to or greater than 0."); + throw new ArgumentOutOfRangeException(nameof(limit), "limit must be greater than 0 when specified."); } - var values = new RedisValue[2 + (useApproximateMaxLength ? 1 : 0)]; + if (limit is null && !useApproximateMaxLength && mode == StreamTrimMode.KeepReferences) + { + // avoid array alloc in simple case + return Message.Create(Database, flags, RedisCommand.XTRIM, key, maxLen ? StreamConstants.MaxLen : StreamConstants.MinId, threshold); + } - values[0] = StreamConstants.MaxLen; + var values = new RedisValue[2 + (useApproximateMaxLength ? 1 : 0) + (limit.HasValue ? 2 : 0) + (mode == StreamTrimMode.KeepReferences ? 0 : 1)]; + + var offset = 0; + + values[offset++] = maxLen ? StreamConstants.MaxLen : StreamConstants.MinId; if (useApproximateMaxLength) { - values[1] = StreamConstants.ApproximateMaxLen; - values[2] = maxLength; + values[offset++] = StreamConstants.ApproximateMaxLen; } - else + + values[offset++] = threshold; + + if (limit.HasValue) + { + values[offset++] = RedisLiterals.LIMIT; + values[offset++] = limit.GetValueOrDefault(); + } + + if (mode != StreamTrimMode.KeepReferences) // omit when not needed, for back-compat { - values[1] = maxLength; + values[offset++] = StreamConstants.GetMode(mode); } + Debug.Assert(offset == values.Length); + return Message.Create( Database, flags, diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 06647212b..58e23b35b 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -6,6 +6,7 @@ using System.IO; using System.Linq; using System.Net; +using System.Runtime.CompilerServices; using System.Text; using System.Text.RegularExpressions; using Microsoft.Extensions.Logging; @@ -1377,6 +1378,77 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + internal static ResultProcessor StreamTrimResult => + Int32EnumProcessor.Instance; + + internal static ResultProcessor StreamTrimResultArray => + Int32EnumArrayProcessor.Instance; + + private class Int32EnumProcessor : ResultProcessor where T : unmanaged, Enum + { + private Int32EnumProcessor() { } + public static readonly Int32EnumProcessor Instance = new(); + + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + switch (result.Resp2TypeBulkString) + { + case ResultType.Integer: + case ResultType.SimpleString: + case ResultType.BulkString: + if (result.TryGetInt64(out long i64)) + { + Debug.Assert(Unsafe.SizeOf() == sizeof(int)); + int i32 = (int)i64; + SetResult(message, Unsafe.As(ref i32)); + return true; + } + break; + case ResultType.Array when result.ItemsCount == 1: // pick a single element from a unit vector + if (result.GetItems()[0].TryGetInt64(out i64)) + { + Debug.Assert(Unsafe.SizeOf() == sizeof(int)); + int i32 = (int)i64; + SetResult(message, Unsafe.As(ref i32)); + return true; + } + break; + } + return false; + } + } + + private class Int32EnumArrayProcessor : ResultProcessor where T : unmanaged, Enum + { + private Int32EnumArrayProcessor() { } + public static readonly Int32EnumArrayProcessor Instance = new(); + + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + switch (result.Resp2TypeArray) + { + case ResultType.Array: + T[] arr; + if (result.IsNull) + { + arr = null!; + } + else + { + Debug.Assert(Unsafe.SizeOf() == sizeof(int)); + arr = result.ToArray(static (in RawResult x) => + { + int i32 = (int)x.AsRedisValue(); + return Unsafe.As(ref i32); + })!; + } + SetResult(message, arr); + return true; + } + return false; + } + } + private class PubSubNumSubProcessor : Int64Processor { protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) diff --git a/src/StackExchange.Redis/StreamConstants.cs b/src/StackExchange.Redis/StreamConstants.cs index 74650e010..929398e4b 100644 --- a/src/StackExchange.Redis/StreamConstants.cs +++ b/src/StackExchange.Redis/StreamConstants.cs @@ -1,4 +1,6 @@ -namespace StackExchange.Redis +using System; + +namespace StackExchange.Redis { /// /// Constants representing values used in Redis Stream commands. @@ -59,6 +61,7 @@ internal static class StreamConstants internal static readonly RedisValue SetId = "SETID"; internal static readonly RedisValue MaxLen = "MAXLEN"; + internal static readonly RedisValue MinId = "MINID"; internal static readonly RedisValue MkStream = "MKSTREAM"; @@ -67,5 +70,17 @@ internal static class StreamConstants internal static readonly RedisValue Stream = "STREAM"; internal static readonly RedisValue Streams = "STREAMS"; + + private static readonly RedisValue KeepRef = "KEEPREF", DelRef = "DELREF", Acked = "ACKED"; + + internal static readonly RedisValue Ids = "IDS"; + + internal static RedisValue GetMode(StreamTrimMode mode) => mode switch + { + StreamTrimMode.KeepReferences => KeepRef, + StreamTrimMode.DeleteReferences => DelRef, + StreamTrimMode.Acknowledged => Acked, + _ => throw new ArgumentOutOfRangeException(nameof(mode)), + }; } } diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index 3c56f7605..612ca182b 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1202,6 +1202,27 @@ public void StreamTrim() mock.Received().StreamTrim("prefix:key", 1000, true, CommandFlags.None); } + [Fact] + public void StreamTrimByMinId() + { + prefixed.StreamTrimByMinId("key", 1111111111); + mock.Received().StreamTrimByMinId("prefix:key", 1111111111); + } + + [Fact] + public void StreamTrimByMinIdWithApproximate() + { + prefixed.StreamTrimByMinId("key", 1111111111, useApproximateMaxLength: true); + mock.Received().StreamTrimByMinId("prefix:key", 1111111111, useApproximateMaxLength: true); + } + + [Fact] + public void StreamTrimByMinIdWithApproximateAndLimit() + { + prefixed.StreamTrimByMinId("key", 1111111111, useApproximateMaxLength: true, limit: 100); + mock.Received().StreamTrimByMinId("prefix:key", 1111111111, useApproximateMaxLength: true, limit: 100); + } + [Fact] public void StringAppend() { diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index 70893e510..b8cf9a4b9 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -1118,6 +1118,27 @@ public async Task StreamTrimAsync() await mock.Received().StreamTrimAsync("prefix:key", 1000, true, CommandFlags.None); } + [Fact] + public async Task StreamTrimByMinIdAsync() + { + await prefixed.StreamTrimByMinIdAsync("key", 1111111111); + await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111); + } + + [Fact] + public async Task StreamTrimByMinIdAsyncWithApproximate() + { + await prefixed.StreamTrimByMinIdAsync("key", 1111111111, useApproximateMaxLength: true); + await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111, useApproximateMaxLength: true); + } + + [Fact] + public async Task StreamTrimByMinIdAsyncWithApproximateAndLimit() + { + await prefixed.StreamTrimByMinIdAsync("key", 1111111111, useApproximateMaxLength: true, limit: 100); + await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111, useApproximateMaxLength: true, limit: 100); + } + [Fact] public async Task StringAppendAsync() { diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index aef914293..ac7bdcdb0 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -698,31 +698,85 @@ public async Task StreamConsumerGroupAcknowledgeMessage() var id1 = db.StreamAdd(key, "field1", "value1"); var id2 = db.StreamAdd(key, "field2", "value2"); var id3 = db.StreamAdd(key, "field3", "value3"); + RedisValue notexist = "0-0"; var id4 = db.StreamAdd(key, "field4", "value4"); db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); // Read all 4 messages, they will be assigned to the consumer var entries = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages); + Assert.Equal(4, entries.Length); // Send XACK for 3 of the messages // Single message Id overload. var oneAck = db.StreamAcknowledge(key, groupName, id1); + Assert.Equal(1, oneAck); + + var nack = db.StreamAcknowledge(key, groupName, notexist); + Assert.Equal(0, nack); // Multiple message Id overload. - var twoAck = db.StreamAcknowledge(key, groupName, [id3, id4]); + var twoAck = db.StreamAcknowledge(key, groupName, [id3, notexist, id4]); // Read the group again, it should only return the unacknowledged message. var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0"); - Assert.Equal(4, entries.Length); - Assert.Equal(1, oneAck); Assert.Equal(2, twoAck); Assert.Single(notAcknowledged); Assert.Equal(id2, notAcknowledged[0].Id); } + [Theory] + [InlineData(StreamTrimMode.KeepReferences)] + [InlineData(StreamTrimMode.DeleteReferences)] + [InlineData(StreamTrimMode.Acknowledged)] + public void StreamConsumerGroupAcknowledgeAndDeleteMessage(StreamTrimMode mode) + { + using var conn = Create(require: RedisFeatures.v8_2_0_rc1); + + var db = conn.GetDatabase(); + var key = Me() + ":" + mode; + const string groupName = "test_group", + consumer = "test_consumer"; + + var id1 = db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + RedisValue notexist = "0-0"; + var id4 = db.StreamAdd(key, "field4", "value4"); + + db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); + + // Read all 4 messages, they will be assigned to the consumer + var entries = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages); + Assert.Equal(4, entries.Length); + + // Send XACK for 3 of the messages + + // Single message Id overload. + var oneAck = db.StreamAcknowledgeAndDelete(key, groupName, mode, id1); + Assert.Equal(StreamTrimResult.Deleted, oneAck); + + StreamTrimResult nack = db.StreamAcknowledgeAndDelete(key, groupName, mode, notexist); + Assert.Equal(StreamTrimResult.NotFound, nack); + + // Multiple message Id overload. + RedisValue[] ids = new[] { id3, notexist, id4 }; + var twoAck = db.StreamAcknowledgeAndDelete(key, groupName, mode, ids); + + // Read the group again, it should only return the unacknowledged message. + var notAcknowledged = db.StreamReadGroup(key, groupName, consumer, "0-0"); + + Assert.Equal(3, twoAck.Length); + Assert.Equal(StreamTrimResult.Deleted, twoAck[0]); + Assert.Equal(StreamTrimResult.NotFound, twoAck[1]); + Assert.Equal(StreamTrimResult.Deleted, twoAck[2]); + + Assert.Single(notAcknowledged); + Assert.Equal(id2, notAcknowledged[0].Id); + } + [Fact] public async Task StreamConsumerGroupClaimMessages() { @@ -1229,6 +1283,54 @@ public async Task StreamDeleteMessages() Assert.Equal(2, messages.Length); } + [Theory] + [InlineData(StreamTrimMode.KeepReferences)] + [InlineData(StreamTrimMode.DeleteReferences)] + [InlineData(StreamTrimMode.Acknowledged)] + public void StreamDeleteExMessage(StreamTrimMode mode) + { + using var conn = Create(require: RedisFeatures.v8_2_0_rc1); // XDELEX + + var db = conn.GetDatabase(); + var key = Me() + ":" + mode; + + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + db.StreamAdd(key, "field4", "value4"); + + var deleted = db.StreamDelete(key, new[] { id3 }, mode: mode); + var messages = db.StreamRange(key); + + Assert.Equal(StreamTrimResult.Deleted, Assert.Single(deleted)); + Assert.Equal(3, messages.Length); + } + + [Theory] + [InlineData(StreamTrimMode.KeepReferences)] + [InlineData(StreamTrimMode.DeleteReferences)] + [InlineData(StreamTrimMode.Acknowledged)] + public void StreamDeleteExMessages(StreamTrimMode mode) + { + using var conn = Create(require: RedisFeatures.v8_2_0_rc1); // XDELEX + + var db = conn.GetDatabase(); + var key = Me() + ":" + mode; + + db.StreamAdd(key, "field1", "value1"); + var id2 = db.StreamAdd(key, "field2", "value2"); + var id3 = db.StreamAdd(key, "field3", "value3"); + db.StreamAdd(key, "field4", "value4"); + + var deleted = db.StreamDelete(key, new[] { id2, id3 }, mode: mode); + var messages = db.StreamRange(key); + + Assert.Equal(2, deleted.Length); + Assert.Equal(StreamTrimResult.Deleted, deleted[0]); + Assert.Equal(StreamTrimResult.Deleted, deleted[1]); + Assert.Equal(2, messages.Length); + } + [Fact] public async Task StreamGroupInfoGet() { @@ -1891,6 +1993,8 @@ public async Task StreamReadWithAfterIdAndCount_2() Assert.Equal(id3, entries[1].Id); } + protected override string GetConfiguration() => "127.0.0.1:6379"; + [Fact] public async Task StreamTrimLength() { @@ -1912,6 +2016,70 @@ public async Task StreamTrimLength() Assert.Equal(1, len); } + private static Version ForMode(StreamTrimMode mode, Version? defaultVersion = null) => mode switch + { + StreamTrimMode.KeepReferences => defaultVersion ?? RedisFeatures.v5_0_0, + StreamTrimMode.Acknowledged => RedisFeatures.v8_2_0_rc1, + StreamTrimMode.DeleteReferences => RedisFeatures.v8_2_0_rc1, + _ => throw new ArgumentOutOfRangeException(nameof(mode)), + }; + + [Theory] + [InlineData(StreamTrimMode.KeepReferences)] + [InlineData(StreamTrimMode.DeleteReferences)] + [InlineData(StreamTrimMode.Acknowledged)] + public void StreamTrimByMinId(StreamTrimMode mode) + { + using var conn = Create(require: ForMode(mode, RedisFeatures.v6_2_0)); + + var db = conn.GetDatabase(); + var key = Me() + ":" + mode; + + // Add a couple items and check length. + db.StreamAdd(key, "field1", "value1", 1111111110); + db.StreamAdd(key, "field2", "value2", 1111111111); + db.StreamAdd(key, "field3", "value3", 1111111112); + + var numRemoved = db.StreamTrimByMinId(key, 1111111111, mode: mode); + var len = db.StreamLength(key); + + Assert.Equal(1, numRemoved); + Assert.Equal(2, len); + } + + [Theory] + [InlineData(StreamTrimMode.KeepReferences)] + [InlineData(StreamTrimMode.DeleteReferences)] + [InlineData(StreamTrimMode.Acknowledged)] + public void StreamTrimByMinIdWithApproximateAndLimit(StreamTrimMode mode) + { + using var conn = Create(require: ForMode(mode, RedisFeatures.v6_2_0)); + + var db = conn.GetDatabase(); + var key = Me() + ":" + mode; + + const int maxLength = 1000; + const int limit = 100; + + for (var i = 0; i < maxLength; i++) + { + db.StreamAdd(key, $"field", $"value", 1111111110 + i); + } + + var numRemoved = db.StreamTrimByMinId(key, 1111111110 + maxLength, useApproximateMaxLength: true, limit: limit, mode: mode); + var expectRemoved = mode switch + { + StreamTrimMode.KeepReferences => limit, + StreamTrimMode.DeleteReferences => 0, + StreamTrimMode.Acknowledged => 0, + _ => throw new ArgumentOutOfRangeException(nameof(mode)), + }; + var len = db.StreamLength(key); + + Assert.Equal(expectRemoved, numRemoved); + Assert.Equal(maxLength - expectRemoved, len); + } + [Fact] public async Task StreamVerifyLength() { @@ -1939,14 +2107,17 @@ public async Task AddWithApproxCountAsync() await db.StreamAddAsync(key, "field", "value", maxLength: 10, useApproximateMaxLength: true, flags: CommandFlags.None).ConfigureAwait(false); } - [Fact] - public async Task AddWithApproxCount() + [Theory] + [InlineData(StreamTrimMode.KeepReferences)] + [InlineData(StreamTrimMode.DeleteReferences)] + [InlineData(StreamTrimMode.Acknowledged)] + public async Task AddWithApproxCount(StreamTrimMode mode) { - await using var conn = Create(require: RedisFeatures.v5_0_0); + await using var conn = Create(require: ForMode(mode)); var db = conn.GetDatabase(); - var key = Me(); - db.StreamAdd(key, "field", "value", maxLength: 10, useApproximateMaxLength: true, flags: CommandFlags.None); + var key = Me() + ":" + mode; + db.StreamAdd(key, "field", "value", maxLength: 10, useApproximateMaxLength: true, trimMode: mode, flags: CommandFlags.None); } [Fact]