Add DeleteStreamsForSessionAsync to ISseEventStreamStore#1288
Add DeleteStreamsForSessionAsync to ISseEventStreamStore#1288mikekistler wants to merge 1 commit intomodelcontextprotocol:mainfrom
Conversation
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
I don't mind adding a method to ISseEventStreamStore that notifies it when a session has ended along the lines of DeleteStreamsForSessionAsync, but we'd have to be careful not to ever call it given a custom ISessionMigrationHandler. Also, we probably should rename it to something like CompleteSessionAsync to be less prescriptive, and have the DistributedCacheStreamStore implementation of it no-op. Even if we didn't have to worry about the race conditions, I think it might be less efficient to proactively query and delete cache items vs let them expire.
|
|
||
| // Act & Assert | ||
| await Assert.ThrowsAsync<ArgumentNullException>("sessionId", | ||
| async () => await store.DeleteStreamsForSessionAsync(null!, CancellationToken)); |
There was a problem hiding this comment.
Outside of these unit tests, I don't see anything that calls DeleteStreamsForSessionAsync. It'd be easy enough to do so in StreamableHttpServerTransport.DisposeAsync, but that's going to run into problems once we add ISessionMigrationHandler from #1270. It also isn't helpful in stateless mode if/when we add support for that.
| } | ||
|
|
||
| var updatedIndexBytes = JsonSerializer.SerializeToUtf8Bytes(index, DistributedCacheEventStreamStoreJsonUtilities.SessionIndexJsonTypeInfo); | ||
| await _cache.SetAsync(indexKey, updatedIndexBytes, new DistributedCacheEntryOptions |
There was a problem hiding this comment.
I don't see how we can easily prevent concurrent calls to UpdateSessionIndexAsync for concurrent streams from clobbering the SessionIndex causing streams to be lost. This might be tractable if we knew a given session could only be owned by this process by using a lock, but with ISessionMigrationHandler we have to consider we might have the same session concurrently handled by different machines.
Closes #1287
Adds a
DeleteStreamsForSessionAsyncmethod toISseEventStreamStorefor proactive cleanup of stored SSE event streams when a session ends, rather than relying solely on cache expiration.Changes:
DeleteStreamsForSessionAsync(string sessionId, CancellationToken)toISseEventStreamStoreDistributedCacheEventStreamStore: maintains a session index in the cache tracking stream IDs and last sequences per session; delete reads the index and removes all event, metadata, and index entriesTestSseEventStreamStoreandBlockingEventStreamStore: implemented for test compatibility