Skip to content

Add DeleteStreamsForSessionAsync to ISseEventStreamStore#1288

Open
mikekistler wants to merge 1 commit intomodelcontextprotocol:mainfrom
mikekistler:mdk/add-delete-streams-for-session
Open

Add DeleteStreamsForSessionAsync to ISseEventStreamStore#1288
mikekistler wants to merge 1 commit intomodelcontextprotocol:mainfrom
mikekistler:mdk/add-delete-streams-for-session

Conversation

@mikekistler
Copy link
Contributor

Closes #1287

Adds a DeleteStreamsForSessionAsync method to ISseEventStreamStore for proactive cleanup of stored SSE event streams when a session ends, rather than relying solely on cache expiration.

Changes:

  • Added DeleteStreamsForSessionAsync(string sessionId, CancellationToken) to ISseEventStreamStore
  • DistributedCacheEventStreamStore: maintains a session index in the cache tracking stream IDs and last sequences per session; delete reads the index and removes all event, metadata, and index entries
  • TestSseEventStreamStore and BlockingEventStreamStore: implemented for test compatibility
  • Added 5 unit tests covering null args, no-op for missing sessions, full deletion, session isolation, and multi-stream sessions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copy link
Contributor

@halter73 halter73 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind adding a method to ISseEventStreamStore that notifies it when a session has ended along the lines of DeleteStreamsForSessionAsync, but we'd have to be careful not to ever call it given a custom ISessionMigrationHandler. Also, we probably should rename it to something like CompleteSessionAsync to be less prescriptive, and have the DistributedCacheStreamStore implementation of it no-op. Even if we didn't have to worry about the race conditions, I think it might be less efficient to proactively query and delete cache items vs let them expire.


// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>("sessionId",
async () => await store.DeleteStreamsForSessionAsync(null!, CancellationToken));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside of these unit tests, I don't see anything that calls DeleteStreamsForSessionAsync. It'd be easy enough to do so in StreamableHttpServerTransport.DisposeAsync, but that's going to run into problems once we add ISessionMigrationHandler from #1270. It also isn't helpful in stateless mode if/when we add support for that.

}

var updatedIndexBytes = JsonSerializer.SerializeToUtf8Bytes(index, DistributedCacheEventStreamStoreJsonUtilities.SessionIndexJsonTypeInfo);
await _cache.SetAsync(indexKey, updatedIndexBytes, new DistributedCacheEntryOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we can easily prevent concurrent calls to UpdateSessionIndexAsync for concurrent streams from clobbering the SessionIndex causing streams to be lost. This might be tractable if we knew a given session could only be owned by this process by using a lock, but with ISessionMigrationHandler we have to consider we might have the same session concurrently handled by different machines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add DeleteStreamsForSessionAsync to ISseEventStreamStore

2 participants

Comments