-
Notifications
You must be signed in to change notification settings - Fork 627
Add DeleteStreamsForSessionAsync to ISseEventStreamStore #1288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1668,6 +1668,133 @@ public void EventIdFormatter_TryParse_ReturnsFalse_ForNonNumericSequence() | |
| Assert.False(parsed); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task DeleteStreamsForSessionAsync_ThrowsArgumentNullException_WhenSessionIdIsNull() | ||
| { | ||
| // Arrange | ||
| var cache = CreateMemoryCache(); | ||
| var store = new DistributedCacheEventStreamStore(cache); | ||
|
|
||
| // Act & Assert | ||
| await Assert.ThrowsAsync<ArgumentNullException>("sessionId", | ||
| async () => await store.DeleteStreamsForSessionAsync(null!, CancellationToken)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Outside of these unit tests, I don't see anything that calls |
||
| } | ||
|
|
||
| [Fact] | ||
| public async Task DeleteStreamsForSessionAsync_NoOp_WhenSessionDoesNotExist() | ||
| { | ||
| // Arrange | ||
| var cache = CreateMemoryCache(); | ||
| var store = new DistributedCacheEventStreamStore(cache); | ||
|
|
||
| // Act - should not throw | ||
| await store.DeleteStreamsForSessionAsync("nonexistent-session", CancellationToken); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task DeleteStreamsForSessionAsync_RemovesAllStreamsAndEvents() | ||
| { | ||
| // Arrange | ||
| var cache = CreateMemoryCache(); | ||
| var store = new DistributedCacheEventStreamStore(cache); | ||
|
|
||
| var writer = await store.CreateStreamAsync(new SseEventStreamOptions | ||
| { | ||
| SessionId = "session-1", | ||
| StreamId = "stream-1", | ||
| Mode = SseEventStreamMode.Streaming | ||
| }, CancellationToken); | ||
|
|
||
| var item1 = await writer.WriteEventAsync(new SseItem<JsonRpcMessage?>(null), CancellationToken); | ||
| var item2 = await writer.WriteEventAsync(new SseItem<JsonRpcMessage?>(null), CancellationToken); | ||
| await writer.DisposeAsync(); | ||
|
|
||
| // Verify events are readable before deletion | ||
| var reader = await store.GetStreamReaderAsync(item1.EventId!, CancellationToken); | ||
| Assert.NotNull(reader); | ||
|
|
||
| // Act | ||
| await store.DeleteStreamsForSessionAsync("session-1", CancellationToken); | ||
|
|
||
| // Assert - events should no longer be readable | ||
| var readerAfterDelete = await store.GetStreamReaderAsync(item1.EventId!, CancellationToken); | ||
| Assert.Null(readerAfterDelete); | ||
|
|
||
| var readerAfterDelete2 = await store.GetStreamReaderAsync(item2.EventId!, CancellationToken); | ||
| Assert.Null(readerAfterDelete2); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task DeleteStreamsForSessionAsync_DoesNotAffectOtherSessions() | ||
| { | ||
| // Arrange | ||
| var cache = CreateMemoryCache(); | ||
| var store = new DistributedCacheEventStreamStore(cache); | ||
|
|
||
| // Create streams for two sessions | ||
| var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions | ||
| { | ||
| SessionId = "session-1", | ||
| StreamId = "stream-1", | ||
| Mode = SseEventStreamMode.Streaming | ||
| }, CancellationToken); | ||
| var item1 = await writer1.WriteEventAsync(new SseItem<JsonRpcMessage?>(null), CancellationToken); | ||
| await writer1.DisposeAsync(); | ||
|
|
||
| var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions | ||
| { | ||
| SessionId = "session-2", | ||
| StreamId = "stream-1", | ||
| Mode = SseEventStreamMode.Streaming | ||
| }, CancellationToken); | ||
| var item2 = await writer2.WriteEventAsync(new SseItem<JsonRpcMessage?>(null), CancellationToken); | ||
| await writer2.DisposeAsync(); | ||
|
|
||
| // Act - delete only session-1 | ||
| await store.DeleteStreamsForSessionAsync("session-1", CancellationToken); | ||
|
|
||
| // Assert - session-1 events should be gone | ||
| var reader1 = await store.GetStreamReaderAsync(item1.EventId!, CancellationToken); | ||
| Assert.Null(reader1); | ||
|
|
||
| // Assert - session-2 events should still be readable | ||
| var reader2 = await store.GetStreamReaderAsync(item2.EventId!, CancellationToken); | ||
| Assert.NotNull(reader2); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task DeleteStreamsForSessionAsync_RemovesMultipleStreamsInSameSession() | ||
| { | ||
| // Arrange | ||
| var cache = CreateMemoryCache(); | ||
| var store = new DistributedCacheEventStreamStore(cache); | ||
|
|
||
| var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions | ||
| { | ||
| SessionId = "session-1", | ||
| StreamId = "stream-a", | ||
| Mode = SseEventStreamMode.Streaming | ||
| }, CancellationToken); | ||
| var itemA = await writer1.WriteEventAsync(new SseItem<JsonRpcMessage?>(null), CancellationToken); | ||
| await writer1.DisposeAsync(); | ||
|
|
||
| var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions | ||
| { | ||
| SessionId = "session-1", | ||
| StreamId = "stream-b", | ||
| Mode = SseEventStreamMode.Streaming | ||
| }, CancellationToken); | ||
| var itemB = await writer2.WriteEventAsync(new SseItem<JsonRpcMessage?>(null), CancellationToken); | ||
| await writer2.DisposeAsync(); | ||
|
|
||
| // Act | ||
| await store.DeleteStreamsForSessionAsync("session-1", CancellationToken); | ||
|
|
||
| // Assert - both streams should be gone | ||
| Assert.Null(await store.GetStreamReaderAsync(itemA.EventId!, CancellationToken)); | ||
| Assert.Null(await store.GetStreamReaderAsync(itemB.EventId!, CancellationToken)); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// A distributed cache that tracks all operations for verification in tests. | ||
| /// Supports tracking Set calls, counting metadata reads, and simulating metadata/event expiration. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how we can easily prevent concurrent calls to
UpdateSessionIndexAsyncfor concurrent streams from clobbering theSessionIndexcausing streams to be lost. This might be tractable if we knew a given session could only be owned by this process by using a lock, but withISessionMigrationHandlerwe have to consider we might have the same session concurrently handled by different machines.