From 548d4eb35d92489152738dd4747e80d21cb74373 Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Mon, 16 Feb 2026 07:27:22 -0600 Subject: [PATCH] Add DeleteStreamsForSessionAsync to ISseEventStreamStore Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Server/ISseEventStreamStore.cs | 12 ++ .../DistributedCacheEventStreamStore.cs | 104 ++++++++++++++ ...ributedCacheEventStreamStoreJsonContext.cs | 7 + .../ResumabilityIntegrationTests.cs | 3 + .../Utils/TestSseEventStreamStore.cs | 22 +++ .../DistributedCacheEventStreamStoreTests.cs | 127 ++++++++++++++++++ 6 files changed, 275 insertions(+) diff --git a/src/ModelContextProtocol.Core/Server/ISseEventStreamStore.cs b/src/ModelContextProtocol.Core/Server/ISseEventStreamStore.cs index 3d9d9b948..ae7b03470 100644 --- a/src/ModelContextProtocol.Core/Server/ISseEventStreamStore.cs +++ b/src/ModelContextProtocol.Core/Server/ISseEventStreamStore.cs @@ -20,4 +20,16 @@ public interface ISseEventStreamStore /// A token to cancel the operation. /// A reader for the event stream, or null if no matching stream is found. ValueTask GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken = default); + + /// + /// Deletes all stored event streams and their associated events for the specified session. + /// + /// The ID of the session whose streams should be deleted. + /// A token to cancel the operation. + /// A task representing the asynchronous operation. + /// + /// This method is a best-effort operation. If the session does not exist or has no stored streams, + /// the method completes without error. + /// + ValueTask DeleteStreamsForSessionAsync(string sessionId, CancellationToken cancellationToken = default); } diff --git a/src/ModelContextProtocol/Server/DistributedCacheEventStreamStore.cs b/src/ModelContextProtocol/Server/DistributedCacheEventStreamStore.cs index 4a71b9448..30ba58cb9 100644 --- a/src/ModelContextProtocol/Server/DistributedCacheEventStreamStore.cs +++ b/src/ModelContextProtocol/Server/DistributedCacheEventStreamStore.cs @@ -51,6 +51,48 @@ public ValueTask CreateStreamAsync(SseEventStreamOptions return new ValueTask(writer); } + /// + public async ValueTask DeleteStreamsForSessionAsync(string sessionId, CancellationToken cancellationToken = default) + { + Throw.IfNull(sessionId); + + // Read the session index to find all streams for this session + var indexKey = CacheKeys.SessionIndex(sessionId); + var indexBytes = await _cache.GetAsync(indexKey, cancellationToken).ConfigureAwait(false); + if (indexBytes is null) + { + LogSessionIndexNotFound(sessionId); + return; + } + + var index = JsonSerializer.Deserialize(indexBytes, DistributedCacheEventStreamStoreJsonUtilities.SessionIndexJsonTypeInfo); + if (index?.Streams is null) + { + LogSessionIndexDeserializationFailed(sessionId); + return; + } + + // Delete all events and metadata for each stream + foreach (var stream in index.Streams) + { + // Delete all event keys for this stream + for (long seq = 1; seq <= stream.LastSequence; seq++) + { + var eventId = DistributedCacheEventIdFormatter.Format(sessionId, stream.StreamId, seq); + var eventKey = CacheKeys.Event(eventId); + await _cache.RemoveAsync(eventKey, cancellationToken).ConfigureAwait(false); + } + + // Delete the stream metadata + var metadataKey = CacheKeys.StreamMetadata(sessionId, stream.StreamId); + await _cache.RemoveAsync(metadataKey, cancellationToken).ConfigureAwait(false); + } + + // Delete the session index itself + await _cache.RemoveAsync(indexKey, cancellationToken).ConfigureAwait(false); + LogStreamsDeletedForSession(sessionId, index.Streams.Count); + } + /// public async ValueTask GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken = default) { @@ -107,6 +149,12 @@ public static string StreamMetadata(string sessionId, string streamId) return $"{Prefix}meta:{sessionIdBase64}:{streamIdBase64}"; } + public static string SessionIndex(string sessionId) + { + var sessionIdBase64 = Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes(sessionId)); + return $"{Prefix}idx:{sessionIdBase64}"; + } + public static string Event(string eventId) => $"{Prefix}event:{eventId}"; } @@ -132,6 +180,23 @@ internal sealed class StoredEvent public JsonRpcMessage? Data { get; set; } } + /// + /// Index of all streams belonging to a session, stored in the cache. + /// + internal sealed class SessionIndex + { + public List Streams { get; set; } = []; + } + + /// + /// Entry in the session index representing a single stream. + /// + internal sealed class SessionStreamEntry + { + public string StreamId { get; set; } = string.Empty; + public long LastSequence { get; set; } + } + private sealed partial class DistributedCacheEventStreamWriter : ISseEventStreamWriter { private readonly IDistributedCache _cache; @@ -228,6 +293,36 @@ private async ValueTask UpdateMetadataAsync(bool isCompleted, CancellationToken SlidingExpiration = _options.MetadataSlidingExpiration, AbsoluteExpirationRelativeToNow = _options.MetadataAbsoluteExpiration, }, cancellationToken).ConfigureAwait(false); + + // Update the session index with this stream's latest sequence + await UpdateSessionIndexAsync(metadata.LastSequence, cancellationToken).ConfigureAwait(false); + } + + private async ValueTask UpdateSessionIndexAsync(long lastSequence, CancellationToken cancellationToken) + { + var indexKey = CacheKeys.SessionIndex(_sessionId); + var indexBytes = await _cache.GetAsync(indexKey, cancellationToken).ConfigureAwait(false); + + var index = indexBytes is not null + ? JsonSerializer.Deserialize(indexBytes, DistributedCacheEventStreamStoreJsonUtilities.SessionIndexJsonTypeInfo) ?? new SessionIndex() + : new SessionIndex(); + + var existingEntry = index.Streams.Find(s => s.StreamId == _streamId); + if (existingEntry is not null) + { + existingEntry.LastSequence = lastSequence; + } + else + { + index.Streams.Add(new SessionStreamEntry { StreamId = _streamId, LastSequence = lastSequence }); + } + + var updatedIndexBytes = JsonSerializer.SerializeToUtf8Bytes(index, DistributedCacheEventStreamStoreJsonUtilities.SessionIndexJsonTypeInfo); + await _cache.SetAsync(indexKey, updatedIndexBytes, new DistributedCacheEntryOptions + { + SlidingExpiration = _options.MetadataSlidingExpiration, + AbsoluteExpirationRelativeToNow = _options.MetadataAbsoluteExpiration, + }, cancellationToken).ConfigureAwait(false); } private void ThrowIfDisposed() @@ -398,4 +493,13 @@ public DistributedCacheEventStreamReader( [LoggerMessage(Level = LogLevel.Warning, Message = "Failed to deserialize stream metadata for session '{SessionId}', stream '{StreamId}'.")] private partial void LogStreamMetadataDeserializationFailed(string sessionId, string streamId); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Session index not found for session '{SessionId}'. No streams to delete.")] + private partial void LogSessionIndexNotFound(string sessionId); + + [LoggerMessage(Level = LogLevel.Warning, Message = "Failed to deserialize session index for session '{SessionId}'.")] + private partial void LogSessionIndexDeserializationFailed(string sessionId); + + [LoggerMessage(Level = LogLevel.Information, Message = "Deleted {StreamCount} stream(s) for session '{SessionId}'.")] + private partial void LogStreamsDeletedForSession(string sessionId, int streamCount); } diff --git a/src/ModelContextProtocol/Server/DistributedCacheEventStreamStoreJsonContext.cs b/src/ModelContextProtocol/Server/DistributedCacheEventStreamStoreJsonContext.cs index b86f9e30e..a971e1425 100644 --- a/src/ModelContextProtocol/Server/DistributedCacheEventStreamStoreJsonContext.cs +++ b/src/ModelContextProtocol/Server/DistributedCacheEventStreamStoreJsonContext.cs @@ -37,6 +37,12 @@ internal static partial class DistributedCacheEventStreamStoreJsonUtilities public static JsonTypeInfo StoredEventJsonTypeInfo { get; } = (JsonTypeInfo)DefaultOptions.GetTypeInfo(typeof(DistributedCacheEventStreamStore.StoredEvent)); + /// + /// Gets the for . + /// + public static JsonTypeInfo SessionIndexJsonTypeInfo { get; } = + (JsonTypeInfo)DefaultOptions.GetTypeInfo(typeof(DistributedCacheEventStreamStore.SessionIndex)); + private static JsonSerializerOptions CreateDefaultOptions() { // Copy the configuration from McpJsonUtilities.DefaultOptions. @@ -56,5 +62,6 @@ private static JsonSerializerOptions CreateDefaultOptions() GenerationMode = JsonSourceGenerationMode.Metadata)] [JsonSerializable(typeof(DistributedCacheEventStreamStore.StreamMetadata))] [JsonSerializable(typeof(DistributedCacheEventStreamStore.StoredEvent))] + [JsonSerializable(typeof(DistributedCacheEventStreamStore.SessionIndex))] private sealed partial class JsonContext : JsonSerializerContext; } diff --git a/tests/ModelContextProtocol.AspNetCore.Tests/ResumabilityIntegrationTests.cs b/tests/ModelContextProtocol.AspNetCore.Tests/ResumabilityIntegrationTests.cs index ee479c1de..a1164c9c1 100644 --- a/tests/ModelContextProtocol.AspNetCore.Tests/ResumabilityIntegrationTests.cs +++ b/tests/ModelContextProtocol.AspNetCore.Tests/ResumabilityIntegrationTests.cs @@ -173,6 +173,9 @@ public ValueTask CreateStreamAsync(SseEventStreamOptions public ValueTask GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken = default) => throw new NotSupportedException("This test store does not support reading streams."); + public ValueTask DeleteStreamsForSessionAsync(string sessionId, CancellationToken cancellationToken = default) + => default; + private sealed class BlockingEventStreamWriter : ISseEventStreamWriter { private readonly BlockingEventStreamStore _store; diff --git a/tests/ModelContextProtocol.AspNetCore.Tests/Utils/TestSseEventStreamStore.cs b/tests/ModelContextProtocol.AspNetCore.Tests/Utils/TestSseEventStreamStore.cs index 1072fbe69..69007745b 100644 --- a/tests/ModelContextProtocol.AspNetCore.Tests/Utils/TestSseEventStreamStore.cs +++ b/tests/ModelContextProtocol.AspNetCore.Tests/Utils/TestSseEventStreamStore.cs @@ -95,6 +95,28 @@ private void TrackEvent(string eventId, StreamState stream, long sequence, TimeS Interlocked.Increment(ref _storeEventCallCount); } + /// + public ValueTask DeleteStreamsForSessionAsync(string sessionId, CancellationToken cancellationToken = default) + { + // Find all streams belonging to this session + var keysToRemove = _streams.Keys.Where(k => k.StartsWith($"{sessionId}:", StringComparison.Ordinal)).ToList(); + + foreach (var key in keysToRemove) + { + if (_streams.TryRemove(key, out var state)) + { + // Remove all events belonging to this stream from the event lookup + var eventKeysToRemove = _eventLookup.Where(kvp => kvp.Value.Stream == state).Select(kvp => kvp.Key).ToList(); + foreach (var eventKey in eventKeysToRemove) + { + _eventLookup.TryRemove(eventKey, out _); + } + } + } + + return default; + } + private static string GetStreamKey(string sessionId, string streamId) => $"{sessionId}:{streamId}"; /// diff --git a/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs b/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs index 1ec3cac99..418ad7a4c 100644 --- a/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs +++ b/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs @@ -1668,6 +1668,133 @@ public void EventIdFormatter_TryParse_ReturnsFalse_ForNonNumericSequence() Assert.False(parsed); } + [Fact] + public async Task DeleteStreamsForSessionAsync_ThrowsArgumentNullException_WhenSessionIdIsNull() + { + // Arrange + var cache = CreateMemoryCache(); + var store = new DistributedCacheEventStreamStore(cache); + + // Act & Assert + await Assert.ThrowsAsync("sessionId", + async () => await store.DeleteStreamsForSessionAsync(null!, CancellationToken)); + } + + [Fact] + public async Task DeleteStreamsForSessionAsync_NoOp_WhenSessionDoesNotExist() + { + // Arrange + var cache = CreateMemoryCache(); + var store = new DistributedCacheEventStreamStore(cache); + + // Act - should not throw + await store.DeleteStreamsForSessionAsync("nonexistent-session", CancellationToken); + } + + [Fact] + public async Task DeleteStreamsForSessionAsync_RemovesAllStreamsAndEvents() + { + // Arrange + var cache = CreateMemoryCache(); + var store = new DistributedCacheEventStreamStore(cache); + + var writer = await store.CreateStreamAsync(new SseEventStreamOptions + { + SessionId = "session-1", + StreamId = "stream-1", + Mode = SseEventStreamMode.Streaming + }, CancellationToken); + + var item1 = await writer.WriteEventAsync(new SseItem(null), CancellationToken); + var item2 = await writer.WriteEventAsync(new SseItem(null), CancellationToken); + await writer.DisposeAsync(); + + // Verify events are readable before deletion + var reader = await store.GetStreamReaderAsync(item1.EventId!, CancellationToken); + Assert.NotNull(reader); + + // Act + await store.DeleteStreamsForSessionAsync("session-1", CancellationToken); + + // Assert - events should no longer be readable + var readerAfterDelete = await store.GetStreamReaderAsync(item1.EventId!, CancellationToken); + Assert.Null(readerAfterDelete); + + var readerAfterDelete2 = await store.GetStreamReaderAsync(item2.EventId!, CancellationToken); + Assert.Null(readerAfterDelete2); + } + + [Fact] + public async Task DeleteStreamsForSessionAsync_DoesNotAffectOtherSessions() + { + // Arrange + var cache = CreateMemoryCache(); + var store = new DistributedCacheEventStreamStore(cache); + + // Create streams for two sessions + var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions + { + SessionId = "session-1", + StreamId = "stream-1", + Mode = SseEventStreamMode.Streaming + }, CancellationToken); + var item1 = await writer1.WriteEventAsync(new SseItem(null), CancellationToken); + await writer1.DisposeAsync(); + + var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions + { + SessionId = "session-2", + StreamId = "stream-1", + Mode = SseEventStreamMode.Streaming + }, CancellationToken); + var item2 = await writer2.WriteEventAsync(new SseItem(null), CancellationToken); + await writer2.DisposeAsync(); + + // Act - delete only session-1 + await store.DeleteStreamsForSessionAsync("session-1", CancellationToken); + + // Assert - session-1 events should be gone + var reader1 = await store.GetStreamReaderAsync(item1.EventId!, CancellationToken); + Assert.Null(reader1); + + // Assert - session-2 events should still be readable + var reader2 = await store.GetStreamReaderAsync(item2.EventId!, CancellationToken); + Assert.NotNull(reader2); + } + + [Fact] + public async Task DeleteStreamsForSessionAsync_RemovesMultipleStreamsInSameSession() + { + // Arrange + var cache = CreateMemoryCache(); + var store = new DistributedCacheEventStreamStore(cache); + + var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions + { + SessionId = "session-1", + StreamId = "stream-a", + Mode = SseEventStreamMode.Streaming + }, CancellationToken); + var itemA = await writer1.WriteEventAsync(new SseItem(null), CancellationToken); + await writer1.DisposeAsync(); + + var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions + { + SessionId = "session-1", + StreamId = "stream-b", + Mode = SseEventStreamMode.Streaming + }, CancellationToken); + var itemB = await writer2.WriteEventAsync(new SseItem(null), CancellationToken); + await writer2.DisposeAsync(); + + // Act + await store.DeleteStreamsForSessionAsync("session-1", CancellationToken); + + // Assert - both streams should be gone + Assert.Null(await store.GetStreamReaderAsync(itemA.EventId!, CancellationToken)); + Assert.Null(await store.GetStreamReaderAsync(itemB.EventId!, CancellationToken)); + } + /// /// A distributed cache that tracks all operations for verification in tests. /// Supports tracking Set calls, counting metadata reads, and simulating metadata/event expiration.