diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 47974b278..065df8067 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -2286,6 +2286,19 @@ internal static bool TryRead(Sequence pairs, in CommandBytes key, ref } return false; } + internal static bool TryRead(Sequence pairs, in CommandBytes key, ref long? value) + { + var len = pairs.Length / 2; + for (int i = 0; i < len; i++) + { + if (pairs[i * 2].IsEqual(key) && pairs[(i * 2) + 1].TryGetInt64(out var tmp)) + { + value = tmp; + return true; + } + } + return false; + } internal static bool TryRead(Sequence pairs, in CommandBytes key, ref int value) { @@ -2348,7 +2361,8 @@ protected override StreamGroupInfo ParseItem(in RawResult result) var arr = result.GetItems(); string? name = default, lastDeliveredId = default; int consumerCount = default, pendingMessageCount = default; - long entriesRead = default, lag = default; + long entriesRead = default; + long? lag = default; KeyValuePairParser.TryRead(arr, KeyValuePairParser.Name, ref name); KeyValuePairParser.TryRead(arr, KeyValuePairParser.Consumers, ref consumerCount); diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 0ea744848..e4f50db64 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -2042,4 +2042,43 @@ await db.StreamAddAsync( Assert.Equal(123, (int)obj!.id); Assert.Equal("test", (string)obj.name); } + + [Fact] + public async Task StreamConsumerGroupInfoLagIsNull() + { + await using var conn = Create(require: RedisFeatures.v5_0_0); + + var db = conn.GetDatabase(); + var key = Me(); + const string groupName = "test_group", + consumer = "consumer"; + + await db.StreamCreateConsumerGroupAsync(key, groupName); + await db.StreamReadGroupAsync(key, groupName, consumer, "0-0", 1); + await db.StreamAddAsync(key, "field1", "value1"); + await db.StreamAddAsync(key, "field1", "value1"); + + var streamInfo = await db.StreamInfoAsync(key); + await db.StreamDeleteAsync(key, new[] { streamInfo.LastEntry.Id }); + + Assert.Null((await db.StreamGroupInfoAsync(key))[0].Lag); + } + + [Fact] + public async Task StreamConsumerGroupInfoLagIsTwo() + { + await using var conn = Create(require: RedisFeatures.v5_0_0); + + var db = conn.GetDatabase(); + var key = Me(); + const string groupName = "test_group", + consumer = "consumer"; + + await db.StreamCreateConsumerGroupAsync(key, groupName); + await db.StreamReadGroupAsync(key, groupName, consumer, "0-0", 1); + await db.StreamAddAsync(key, "field1", "value1"); + await db.StreamAddAsync(key, "field1", "value1"); + + Assert.Equal(2, (await db.StreamGroupInfoAsync(key))[0].Lag); + } }