Skip to content

Commit 37aac60

Browse files
author
Yash Gupta
committed
Support cancellation
1 parent 159eb96 commit 37aac60

23 files changed

+1323
-48
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ private static class DefaultInitialization implements Initialization {
177177
*/
178178
private final AtomicReference<McpClientSession> mcpClientSession;
179179

180+
private final AtomicReference<Object> initializeRequestId = new AtomicReference<>();
181+
180182
private DefaultInitialization() {
181183
this.initSink = Sinks.one();
182184
this.result = new AtomicReference<>();
@@ -235,6 +237,16 @@ public boolean isInitialized() {
235237
return this.currentInitializationResult() != null;
236238
}
237239

240+
/**
241+
* Returns the request ID of the initialize request, if one has been issued. Used to
242+
* prevent cancellation of the initialize request per spec.
243+
* @return the initialize request ID, or null
244+
*/
245+
public Object getInitializeRequestId() {
246+
DefaultInitialization current = this.initializationRef.get();
247+
return current != null ? current.initializeRequestId.get() : null;
248+
}
249+
238250
public McpSchema.InitializeResult currentInitializationResult() {
239251
DefaultInitialization current = this.initializationRef.get();
240252
McpSchema.InitializeResult initializeResult = current != null ? current.result.get() : null;
@@ -305,8 +317,11 @@ private Mono<McpSchema.InitializeResult> doInitialize(DefaultInitialization init
305317
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(latestVersion,
306318
this.clientCapabilities, this.clientInfo);
307319

308-
Mono<McpSchema.InitializeResult> result = mcpClientSession.sendRequest(McpSchema.METHOD_INITIALIZE,
309-
initializeRequest, McpAsyncClient.INITIALIZE_RESULT_TYPE_REF);
320+
McpClientSession.RequestMono<McpSchema.InitializeResult> requestMono = mcpClientSession.sendRequestWithId(
321+
McpSchema.METHOD_INITIALIZE, initializeRequest, McpAsyncClient.INITIALIZE_RESULT_TYPE_REF);
322+
initialization.initializeRequestId.set(requestMono.requestId());
323+
324+
Mono<McpSchema.InitializeResult> result = requestMono.response();
310325

311326
return result.flatMap(initializeResult -> {
312327
logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.List;
1414
import java.util.Map;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.atomic.AtomicReference;
1617
import java.util.function.Function;
1718

1819
import io.modelcontextprotocol.client.LifecycleInitializer.Initialization;
@@ -22,6 +23,7 @@
2223
import io.modelcontextprotocol.spec.McpClientSession.NotificationHandler;
2324
import io.modelcontextprotocol.spec.McpClientSession.RequestHandler;
2425
import io.modelcontextprotocol.spec.McpClientTransport;
26+
import io.modelcontextprotocol.spec.McpRequestHandle;
2527
import io.modelcontextprotocol.spec.McpSchema;
2628
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
2729
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
@@ -449,6 +451,61 @@ public Mono<Object> ping() {
449451
init -> init.mcpSession().sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF));
450452
}
451453

454+
// --------------------------
455+
// Cancellation
456+
// --------------------------
457+
458+
/**
459+
* Cancels a previously issued request by its ID. Sends a
460+
* {@code notifications/cancelled} notification to the server and errors the pending
461+
* response locally.
462+
* @param requestId The ID of the request to cancel
463+
* @param reason An optional human-readable reason for the cancellation
464+
* @return A Mono that completes when the cancellation notification is sent
465+
*/
466+
public Mono<Void> cancelRequest(Object requestId, String reason) {
467+
if (!this.isInitialized()) {
468+
return Mono.error(new IllegalStateException("Cannot cancel request before initialization"));
469+
}
470+
Object initId = this.initializer.getInitializeRequestId();
471+
if (initId != null && initId.equals(requestId)) {
472+
return Mono.error(new IllegalArgumentException("The initialize request MUST NOT be cancelled"));
473+
}
474+
return this.initializer.withInitialization("cancelling request",
475+
init -> init.mcpSession().sendCancellation(requestId, reason));
476+
}
477+
478+
/**
479+
* Calls a tool and returns a handle that can be used to cancel the request.
480+
* @param callToolRequest The request containing the tool name and input parameters
481+
* @return A McpRequestHandle containing the request ID, response Mono, and a cancel
482+
* function
483+
*/
484+
public McpRequestHandle<McpSchema.CallToolResult> callToolWithHandle(McpSchema.CallToolRequest callToolRequest) {
485+
AtomicReference<String> requestIdRef = new AtomicReference<>();
486+
487+
Mono<McpSchema.CallToolResult> responseMono = this.initializer.withInitialization("calling tool with handle",
488+
init -> {
489+
if (init.initializeResult().capabilities().tools() == null) {
490+
return Mono.error(new IllegalStateException("Server does not provide tools capability"));
491+
}
492+
McpClientSession.RequestMono<McpSchema.CallToolResult> rm = init.mcpSession()
493+
.sendRequestWithId(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF);
494+
requestIdRef.set(rm.requestId());
495+
return rm.response()
496+
.flatMap(result -> Mono.just(validateToolResult(callToolRequest.name(), result)));
497+
});
498+
499+
return McpRequestHandle.lazy(requestIdRef, responseMono, reason -> {
500+
String id = requestIdRef.get();
501+
if (id == null) {
502+
return Mono.error(new IllegalStateException(
503+
"Cannot cancel: request has not been issued yet. Subscribe to the response Mono first."));
504+
}
505+
return this.cancelRequest(id, reason);
506+
});
507+
}
508+
452509
// --------------------------
453510
// Roots
454511
// --------------------------

mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.LoggerFactory;
1212

1313
import io.modelcontextprotocol.common.McpTransportContext;
14+
import io.modelcontextprotocol.spec.McpRequestHandle;
1415
import io.modelcontextprotocol.spec.McpSchema;
1516
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
1617
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
@@ -218,6 +219,20 @@ public Object ping() {
218219
return withProvidedContext(this.delegate.ping()).block();
219220
}
220221

222+
// --------------------------
223+
// Cancellation
224+
// --------------------------
225+
226+
/**
227+
* Cancels a previously issued request. IMPORTANT: This method MUST be called from a
228+
* different thread than the one blocked waiting on the request result.
229+
* @param requestId The ID of the request to cancel
230+
* @param reason An optional human-readable reason for the cancellation
231+
*/
232+
public void cancelRequest(Object requestId, String reason) {
233+
withProvidedContext(this.delegate.cancelRequest(requestId, reason)).block();
234+
}
235+
221236
// --------------------------
222237
// Tools
223238
// --------------------------
@@ -234,7 +249,16 @@ public Object ping() {
234249
*/
235250
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
236251
return withProvidedContext(this.delegate.callTool(callToolRequest)).block();
252+
}
237253

254+
/**
255+
* Calls a tool with a specific timeout.
256+
* @param callToolRequest The request containing the tool name and input parameters
257+
* @param timeout The maximum duration to wait for the result
258+
* @return The tool execution result
259+
*/
260+
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest, Duration timeout) {
261+
return withProvidedContext(this.delegate.callTool(callToolRequest)).timeout(timeout).block();
238262
}
239263

240264
/**

mcp-core/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.LoggerFactory;
1212
import reactor.core.publisher.Mono;
1313

14+
import java.util.HashMap;
1415
import java.util.Map;
1516

1617
class DefaultMcpStatelessServerHandler implements McpStatelessServerHandler {
@@ -24,7 +25,11 @@ class DefaultMcpStatelessServerHandler implements McpStatelessServerHandler {
2425
public DefaultMcpStatelessServerHandler(Map<String, McpStatelessRequestHandler<?>> requestHandlers,
2526
Map<String, McpStatelessNotificationHandler> notificationHandlers) {
2627
this.requestHandlers = requestHandlers;
27-
this.notificationHandlers = notificationHandlers;
28+
this.notificationHandlers = new HashMap<>(notificationHandlers);
29+
this.notificationHandlers.putIfAbsent(McpSchema.METHOD_NOTIFICATION_CANCELLED, (ctx, params) -> {
30+
logger.debug("Ignoring cancellation in stateless mode");
31+
return Mono.empty();
32+
});
2833
}
2934

3035
@Override

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,16 @@ private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServe
183183

184184
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
185185

186+
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_CANCELLED, (exchange, params) -> {
187+
if (features.cancellationConsumer() != null) {
188+
McpSchema.CancelledNotification cancelled = jsonMapper.convertValue(params,
189+
new TypeRef<McpSchema.CancelledNotification>() {
190+
});
191+
return features.cancellationConsumer().apply(exchange, cancelled);
192+
}
193+
return Mono.empty();
194+
});
195+
186196
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
187197
.rootsChangeConsumers();
188198

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,17 @@ public Mono<Object> ping() {
247247
return this.session.sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF);
248248
}
249249

250+
/**
251+
* Cancels a previously issued request to the client (server-to-client direction).
252+
* Sends a {@code notifications/cancelled} notification.
253+
* @param requestId The ID of the request to cancel
254+
* @param reason An optional human-readable reason for the cancellation
255+
* @return A Mono that completes when the cancellation notification is sent
256+
*/
257+
public Mono<Void> cancelRequest(Object requestId, String reason) {
258+
return this.session.sendCancellation(requestId, reason);
259+
}
260+
250261
/**
251262
* Set the minimum logging level for the client. Messages below this level will be
252263
* filtered out.

mcp-core/src/main/java/io/modelcontextprotocol/server/McpServer.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ private SingleSessionAsyncSpecification(McpServerTransportProvider transportProv
237237
public McpAsyncServer build() {
238238
var features = new McpServerFeatures.Async(this.serverInfo, this.serverCapabilities, this.tools,
239239
this.resources, this.resourceTemplates, this.prompts, this.completions, this.rootsChangeHandlers,
240-
this.instructions);
240+
this.instructions, this.cancellationConsumer);
241241

242242
var jsonSchemaValidator = (this.jsonSchemaValidator != null) ? this.jsonSchemaValidator
243243
: McpJsonDefaults.getSchemaValidator();
@@ -265,7 +265,7 @@ public StreamableServerAsyncSpecification(McpStreamableServerTransportProvider t
265265
public McpAsyncServer build() {
266266
var features = new McpServerFeatures.Async(this.serverInfo, this.serverCapabilities, this.tools,
267267
this.resources, this.resourceTemplates, this.prompts, this.completions, this.rootsChangeHandlers,
268-
this.instructions);
268+
this.instructions, this.cancellationConsumer);
269269
var jsonSchemaValidator = this.jsonSchemaValidator != null ? this.jsonSchemaValidator
270270
: McpJsonDefaults.getSchemaValidator();
271271
return new McpAsyncServer(transportProvider, jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
@@ -333,6 +333,8 @@ abstract class AsyncSpecification<S extends AsyncSpecification<S>> {
333333

334334
final List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeHandlers = new ArrayList<>();
335335

336+
BiFunction<McpAsyncServerExchange, McpSchema.CancelledNotification, Mono<Void>> cancellationConsumer;
337+
336338
Duration requestTimeout = Duration.ofHours(10); // Default timeout
337339

338340
public abstract McpAsyncServer build();
@@ -805,6 +807,20 @@ public AsyncSpecification<S> rootsChangeHandlers(
805807
return this.rootsChangeHandlers(Arrays.asList(handlers));
806808
}
807809

810+
/**
811+
* Registers an optional consumer that is invoked when a
812+
* {@code notifications/cancelled} notification is received from the client. The
813+
* session layer already handles the core cancellation logic; this consumer is for
814+
* application-level side-effects (e.g. logging, metrics, UI updates).
815+
* @param consumer The cancellation consumer
816+
* @return This builder instance for method chaining
817+
*/
818+
public AsyncSpecification<S> cancellationConsumer(
819+
BiFunction<McpAsyncServerExchange, McpSchema.CancelledNotification, Mono<Void>> consumer) {
820+
this.cancellationConsumer = consumer;
821+
return this;
822+
}
823+
808824
/**
809825
* Sets the JsonMapper to use for serializing and deserializing JSON messages.
810826
* @param jsonMapper the mapper to use. Must not be null.

mcp-core/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,11 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
4545
Map<String, McpServerFeatures.AsyncPromptSpecification> prompts,
4646
Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions,
4747
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers,
48-
String instructions) {
48+
String instructions,
49+
BiFunction<McpAsyncServerExchange, McpSchema.CancelledNotification, Mono<Void>> cancellationConsumer) {
4950

5051
/**
51-
* Create an instance and validate the arguments.
52-
* @param serverInfo The server implementation details
53-
* @param serverCapabilities The server capabilities
54-
* @param tools The list of tool specifications
55-
* @param resources The map of resource specifications
56-
* @param resourceTemplates The map of resource templates
57-
* @param prompts The map of prompt specifications
58-
* @param rootsChangeConsumers The list of consumers that will be notified when
59-
* the roots list changes
60-
* @param instructions The server instructions text
52+
* Backwards-compatible constructor without cancellationConsumer.
6153
*/
6254
Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
6355
List<McpServerFeatures.AsyncToolSpecification> tools, Map<String, AsyncResourceSpecification> resources,
@@ -66,6 +58,21 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
6658
Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions,
6759
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers,
6860
String instructions) {
61+
this(serverInfo, serverCapabilities, tools, resources, resourceTemplates, prompts, completions,
62+
rootsChangeConsumers, instructions, null);
63+
}
64+
65+
/**
66+
* Create an instance and validate the arguments.
67+
*/
68+
Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
69+
List<McpServerFeatures.AsyncToolSpecification> tools, Map<String, AsyncResourceSpecification> resources,
70+
Map<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates,
71+
Map<String, McpServerFeatures.AsyncPromptSpecification> prompts,
72+
Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions,
73+
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers,
74+
String instructions,
75+
BiFunction<McpAsyncServerExchange, McpSchema.CancelledNotification, Mono<Void>> cancellationConsumer) {
6976

7077
Assert.notNull(serverInfo, "Server info must not be null");
7178

@@ -89,6 +96,7 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
8996
this.completions = (completions != null) ? completions : Map.of();
9097
this.rootsChangeConsumers = (rootsChangeConsumers != null) ? rootsChangeConsumers : List.of();
9198
this.instructions = instructions;
99+
this.cancellationConsumer = cancellationConsumer;
92100
}
93101

94102
/**

mcp-core/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,13 @@ public Object ping() {
143143
return this.exchange.ping().block();
144144
}
145145

146+
/**
147+
* Cancels a previously issued request to the client (server-to-client direction).
148+
* @param requestId The ID of the request to cancel
149+
* @param reason An optional human-readable reason for the cancellation
150+
*/
151+
public void cancelRequest(Object requestId, String reason) {
152+
this.exchange.cancelRequest(requestId, reason).block();
153+
}
154+
146155
}

0 commit comments

Comments
 (0)