Skip to content

Commit c46f73f

Browse files
Copilotstephentoub
andcommitted
Address PR feedback on SubscribeToResourceAsync implementation
- Change Uri overload to non-async to avoid unnecessary async state machine - Register handler before subscribing and cleanup on subscribe failure - Use UriTemplateComparer.Instance.Equals for URI comparison - Invert disposal check to early return pattern - Use finally block for handler disposal instead of swallowing exceptions - Add test for multiple handlers on same URI Co-authored-by: stephentoub <2642209+stephentoub@users.noreply.github.com>
1 parent 9a065c5 commit c46f73f

File tree

2 files changed

+95
-16
lines changed

2 files changed

+95
-16
lines changed

src/ModelContextProtocol.Core/Client/McpClient.Methods.cs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -610,15 +610,15 @@ public Task SubscribeToResourceAsync(
610610
/// Notifications for other resources are filtered out automatically.
611611
/// </para>
612612
/// </remarks>
613-
public async Task<IAsyncDisposable> SubscribeToResourceAsync(
613+
public Task<IAsyncDisposable> SubscribeToResourceAsync(
614614
Uri uri,
615615
Func<ResourceUpdatedNotificationParams, CancellationToken, ValueTask> handler,
616616
RequestOptions? options = null,
617617
CancellationToken cancellationToken = default)
618618
{
619619
Throw.IfNull(uri);
620620

621-
return await SubscribeToResourceAsync(uri.AbsoluteUri, handler, options, cancellationToken).ConfigureAwait(false);
621+
return SubscribeToResourceAsync(uri.AbsoluteUri, handler, options, cancellationToken);
622622
}
623623

624624
/// <summary>
@@ -654,21 +654,30 @@ public async Task<IAsyncDisposable> SubscribeToResourceAsync(
654654
Throw.IfNullOrWhiteSpace(uri);
655655
Throw.IfNull(handler);
656656

657-
// Subscribe to the resource
658-
await SubscribeToResourceAsync(uri, options, cancellationToken).ConfigureAwait(false);
659-
660657
// Register a notification handler that filters for this specific resource
661658
IAsyncDisposable handlerRegistration = RegisterNotificationHandler(
662659
NotificationMethods.ResourceUpdatedNotification,
663660
async (notification, ct) =>
664661
{
665662
if (JsonSerializer.Deserialize(notification.Params, McpJsonUtilities.JsonContext.Default.ResourceUpdatedNotificationParams) is { } resourceUpdate &&
666-
string.Equals(resourceUpdate.Uri, uri, StringComparison.Ordinal))
663+
UriTemplate.UriTemplateComparer.Instance.Equals(resourceUpdate.Uri, uri))
667664
{
668665
await handler(resourceUpdate, ct).ConfigureAwait(false);
669666
}
670667
});
671668

669+
try
670+
{
671+
// Subscribe to the resource
672+
await SubscribeToResourceAsync(uri, options, cancellationToken).ConfigureAwait(false);
673+
}
674+
catch
675+
{
676+
// If subscription fails, unregister the handler before propagating the exception
677+
await handlerRegistration.DisposeAsync().ConfigureAwait(false);
678+
throw;
679+
}
680+
672681
// Return a disposable that unsubscribes and removes the handler
673682
return new ResourceSubscription(this, uri, handlerRegistration, options);
674683
}
@@ -694,18 +703,18 @@ public ResourceSubscription(McpClient client, string uri, IAsyncDisposable handl
694703

695704
public async ValueTask DisposeAsync()
696705
{
697-
if (Interlocked.Exchange(ref _disposed, 1) == 0)
706+
if (Interlocked.Exchange(ref _disposed, 1) != 0)
698707
{
699-
// Unsubscribe from the resource
700-
try
701-
{
702-
await _client.UnsubscribeFromResourceAsync(_uri, _options, CancellationToken.None).ConfigureAwait(false);
703-
}
704-
catch
705-
{
706-
// Swallow exceptions during unsubscribe to ensure handler is still disposed
707-
}
708+
return;
709+
}
708710

711+
try
712+
{
713+
// Unsubscribe from the resource
714+
await _client.UnsubscribeFromResourceAsync(_uri, _options, CancellationToken.None).ConfigureAwait(false);
715+
}
716+
finally
717+
{
709718
// Dispose the notification handler registration
710719
await _handlerRegistration.DisposeAsync().ConfigureAwait(false);
711720
}

tests/ModelContextProtocol.Tests/Client/McpClientResourceSubscriptionTests.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,4 +293,74 @@ public async Task SubscribeToResourceAsync_DisposalIsIdempotent()
293293
// Assert - no exception should be thrown
294294
Assert.True(true);
295295
}
296+
297+
[Fact]
298+
public async Task SubscribeToResourceAsync_MultipleHandlersSameUri_BothReceiveNotifications()
299+
{
300+
// Arrange
301+
await using McpClient client = await CreateMcpClientForServer();
302+
const string resourceUri = "test://resource/1";
303+
var handler1Called = new TaskCompletionSource<bool>();
304+
var handler2Called = new TaskCompletionSource<bool>();
305+
var handler1Count = 0;
306+
var handler2Count = 0;
307+
308+
// Act - Create two subscriptions to the same URI
309+
await using var subscription1 = await client.SubscribeToResourceAsync(
310+
resourceUri,
311+
(notification, ct) =>
312+
{
313+
Interlocked.Increment(ref handler1Count);
314+
handler1Called.TrySetResult(true);
315+
return default;
316+
},
317+
cancellationToken: TestContext.Current.CancellationToken);
318+
319+
await using var subscription2 = await client.SubscribeToResourceAsync(
320+
resourceUri,
321+
(notification, ct) =>
322+
{
323+
Interlocked.Increment(ref handler2Count);
324+
handler2Called.TrySetResult(true);
325+
return default;
326+
},
327+
cancellationToken: TestContext.Current.CancellationToken);
328+
329+
// Send a single notification
330+
await Server.SendNotificationAsync(
331+
NotificationMethods.ResourceUpdatedNotification,
332+
new ResourceUpdatedNotificationParams { Uri = resourceUri },
333+
cancellationToken: TestContext.Current.CancellationToken);
334+
335+
// Assert - Both handlers should be invoked
336+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
337+
var combined = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, TestContext.Current.CancellationToken);
338+
await Task.WhenAll(
339+
handler1Called.Task.WaitAsync(combined.Token),
340+
handler2Called.Task.WaitAsync(combined.Token));
341+
342+
Assert.Equal(1, handler1Count);
343+
Assert.Equal(1, handler2Count);
344+
345+
// Dispose one subscription
346+
await subscription1.DisposeAsync();
347+
348+
// Reset the second handler's task completion
349+
var handler2CalledAgain = new TaskCompletionSource<bool>();
350+
351+
// Send another notification
352+
await Server.SendNotificationAsync(
353+
NotificationMethods.ResourceUpdatedNotification,
354+
new ResourceUpdatedNotificationParams { Uri = resourceUri },
355+
cancellationToken: TestContext.Current.CancellationToken);
356+
357+
// Wait a bit to see if handler2 gets called again
358+
await Task.Delay(100, TestContext.Current.CancellationToken);
359+
360+
// Assert - Only the second handler should still receive notifications
361+
// Handler1 should not have been called again (still 1)
362+
Assert.Equal(1, handler1Count);
363+
// Handler2 should have been called again (now 2)
364+
Assert.Equal(2, handler2Count);
365+
}
296366
}

0 commit comments

Comments
 (0)