From 86da87da0a717703b7ba8b0b8fd59620b26a78be Mon Sep 17 00:00:00 2001 From: Robert Hopland Date: Fri, 27 Jun 2025 11:58:05 +0200 Subject: [PATCH 1/3] TryRead for nullable long, tests on null value for StreamConsumerGroupInfo.Lag --- src/StackExchange.Redis/ResultProcessor.cs | 16 +++++++- .../StackExchange.Redis.Tests/StreamTests.cs | 39 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 47974b278..065df8067 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -2286,6 +2286,19 @@ internal static bool TryRead(Sequence pairs, in CommandBytes key, ref } return false; } + internal static bool TryRead(Sequence pairs, in CommandBytes key, ref long? value) + { + var len = pairs.Length / 2; + for (int i = 0; i < len; i++) + { + if (pairs[i * 2].IsEqual(key) && pairs[(i * 2) + 1].TryGetInt64(out var tmp)) + { + value = tmp; + return true; + } + } + return false; + } internal static bool TryRead(Sequence pairs, in CommandBytes key, ref int value) { @@ -2348,7 +2361,8 @@ protected override StreamGroupInfo ParseItem(in RawResult result) var arr = result.GetItems(); string? name = default, lastDeliveredId = default; int consumerCount = default, pendingMessageCount = default; - long entriesRead = default, lag = default; + long entriesRead = default; + long? lag = default; KeyValuePairParser.TryRead(arr, KeyValuePairParser.Name, ref name); KeyValuePairParser.TryRead(arr, KeyValuePairParser.Consumers, ref consumerCount); diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 0ea744848..4baae7507 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -2042,4 +2042,43 @@ await db.StreamAddAsync( Assert.Equal(123, (int)obj!.id); Assert.Equal("test", (string)obj.name); } + + [Fact] + public void StreamConsumerGroupInfoLagIsNull() + { + using var conn = Create(require: RedisFeatures.v5_0_0); + + var db = conn.GetDatabase(); + var key = Me(); + const string groupName = "test_group", + consumer = "consumer"; + + db.StreamCreateConsumerGroup(key, groupName); + db.StreamReadGroup(key, groupName, consumer, "0-0", 1); + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "field1", "value1"); + + var streamInfo = db.StreamInfo(key); + db.StreamDelete(key, new[] { streamInfo.LastEntry.Id }); + + Assert.Null(db.StreamGroupInfo(key)[0].Lag); + } + + [Fact] + public void StreamConsumerGroupInfoLagIsTwo() + { + using var conn = Create(require: RedisFeatures.v5_0_0); + + var db = conn.GetDatabase(); + var key = Me(); + const string groupName = "test_group", + consumer = "consumer"; + + db.StreamCreateConsumerGroup(key, groupName); + db.StreamReadGroup(key, groupName, consumer, "0-0", 1); + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "field1", "value1"); + + Assert.Equal(2, db.StreamGroupInfo(key)[0].Lag); + } } From 6935115ffbc800841f62713174e667f1b1be522c Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 21 Jul 2025 09:34:40 +0100 Subject: [PATCH 2/3] Update StreamTests.cs make tests async --- .../StackExchange.Redis.Tests/StreamTests.cs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 4baae7507..8bd666998 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -2044,41 +2044,41 @@ await db.StreamAddAsync( } [Fact] - public void StreamConsumerGroupInfoLagIsNull() + public async Task StreamConsumerGroupInfoLagIsNull() { - using var conn = Create(require: RedisFeatures.v5_0_0); + await using var conn = Create(require: RedisFeatures.v5_0_0); var db = conn.GetDatabase(); var key = Me(); const string groupName = "test_group", consumer = "consumer"; - db.StreamCreateConsumerGroup(key, groupName); - db.StreamReadGroup(key, groupName, consumer, "0-0", 1); - db.StreamAdd(key, "field1", "value1"); - db.StreamAdd(key, "field1", "value1"); + await db.StreamCreateConsumerGroupAsync(key, groupName); + await db.StreamReadGroupAsync(key, groupName, consumer, "0-0", 1); + await db.StreamAddAsync(key, "field1", "value1"); + await db.StreamAddAsync(key, "field1", "value1"); - var streamInfo = db.StreamInfo(key); - db.StreamDelete(key, new[] { streamInfo.LastEntry.Id }); + var streamInfo = await db.StreamInfoAsync(key); + await db.StreamDeleteAsync(key, new[] { streamInfo.LastEntry.Id }); - Assert.Null(db.StreamGroupInfo(key)[0].Lag); + Assert.Null((await db.StreamGroupInfoAsync(key)[0]).Lag); } [Fact] - public void StreamConsumerGroupInfoLagIsTwo() + public async Task StreamConsumerGroupInfoLagIsTwo() { - using var conn = Create(require: RedisFeatures.v5_0_0); + await using var conn = Create(require: RedisFeatures.v5_0_0); var db = conn.GetDatabase(); var key = Me(); const string groupName = "test_group", consumer = "consumer"; - db.StreamCreateConsumerGroup(key, groupName); - db.StreamReadGroup(key, groupName, consumer, "0-0", 1); - db.StreamAdd(key, "field1", "value1"); - db.StreamAdd(key, "field1", "value1"); + await db.StreamCreateConsumerGroupAsync(key, groupName); + await db.StreamReadGroupAsync(key, groupName, consumer, "0-0", 1); + await db.StreamAddAsync(key, "field1", "value1"); + await db.StreamAddAsync(key, "field1", "value1"); - Assert.Equal(2, db.StreamGroupInfo(key)[0].Lag); + Assert.Equal(2, (await db.StreamGroupInfoAsync(key)[0]).Lag); } } From 269375b0594e956afbb8cba284448644945ee2df Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 21 Jul 2025 09:41:57 +0100 Subject: [PATCH 3/3] Update StreamTests.cs tyop --- tests/StackExchange.Redis.Tests/StreamTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 8bd666998..e4f50db64 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -2061,7 +2061,7 @@ public async Task StreamConsumerGroupInfoLagIsNull() var streamInfo = await db.StreamInfoAsync(key); await db.StreamDeleteAsync(key, new[] { streamInfo.LastEntry.Id }); - Assert.Null((await db.StreamGroupInfoAsync(key)[0]).Lag); + Assert.Null((await db.StreamGroupInfoAsync(key))[0].Lag); } [Fact] @@ -2079,6 +2079,6 @@ public async Task StreamConsumerGroupInfoLagIsTwo() await db.StreamAddAsync(key, "field1", "value1"); await db.StreamAddAsync(key, "field1", "value1"); - Assert.Equal(2, (await db.StreamGroupInfoAsync(key)[0]).Lag); + Assert.Equal(2, (await db.StreamGroupInfoAsync(key))[0].Lag); } }