Skip to content

Commit df75857

Browse files
authored
fix: avoid dropped errors when transport is closed or uninitialized (#995)
Signed-off-by: Dariusz Jędrzejczyk <2554306+chemicL@users.noreply.github.com>
1 parent dbb9bda commit df75857

8 files changed

Lines changed: 109 additions & 47 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.modelcontextprotocol.spec.McpSchema;
3939
import io.modelcontextprotocol.spec.McpTransportException;
4040
import io.modelcontextprotocol.spec.McpTransportSession;
41+
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
4142
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
4243
import io.modelcontextprotocol.spec.McpTransportStream;
4344
import io.modelcontextprotocol.spec.ProtocolVersions;
@@ -189,14 +190,6 @@ private McpTransportSession<Disposable> createTransportSession() {
189190
return new DefaultMcpTransportSession(onClose);
190191
}
191192

192-
private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
193-
var existingSessionId = Optional.ofNullable(existingSession)
194-
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
195-
.flatMap(McpTransportSession::sessionId)
196-
.orElse(null);
197-
return new ClosedMcpTransportSession<>(existingSessionId);
198-
}
199-
200193
private Publisher<Void> createDelete(String sessionId) {
201194

202195
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
@@ -240,7 +233,8 @@ private void handleException(Throwable t) {
240233
public Mono<Void> closeGracefully() {
241234
return Mono.defer(() -> {
242235
logger.debug("Graceful close triggered");
243-
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
236+
McpTransportSession<Disposable> currentSession = this.activeSession
237+
.getAndSet(ClosedMcpTransportSession.INSTANCE);
244238
if (currentSession != null) {
245239
return Mono.from(currentSession.closeGracefully());
246240
}
@@ -250,6 +244,19 @@ public Mono<Void> closeGracefully() {
250244

251245
private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
252246
return Mono.deferContextual(ctx -> {
247+
var rh = this.handler.get();
248+
if (rh == null) {
249+
logger.warn("Transport has no request handler registered. Remember to call connect!");
250+
}
251+
252+
final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
253+
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));
254+
255+
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
256+
257+
if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
258+
throw new McpTransportSessionClosedException();
259+
}
253260

254261
if (stream != null) {
255262
logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
@@ -259,7 +266,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
259266
}
260267

261268
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
262-
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
269+
263270
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
264271

265272
Disposable connection = Mono.deferContextual(connectionCtx -> {
@@ -389,18 +396,18 @@ else if (statusCode == BAD_REQUEST) {
389396
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
390397
})
391398
.retryWhen(authorizationErrorRetrySpec())
392-
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
399+
.flatMap(jsonrpcMessage -> requestHandler.apply(Mono.just(jsonrpcMessage)))
393400
.onErrorMap(CompletionException.class, t -> t.getCause())
394-
.onErrorComplete(t -> {
395-
this.handleException(t);
396-
return true;
397-
})
398401
.doFinally(s -> {
399402
Disposable ref = disposableRef.getAndSet(null);
400403
if (ref != null) {
401404
transportSession.removeConnection(ref);
402405
}
403406
}))
407+
.onErrorComplete(t -> {
408+
this.handleException(t);
409+
return true;
410+
})
404411
.contextWrite(ctx)
405412
.subscribe();
406413

@@ -467,10 +474,23 @@ public String toString(McpSchema.JSONRPCMessage message) {
467474

468475
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
469476
return Mono.create(deliveredSink -> {
477+
var rh = this.handler.get();
478+
if (rh == null) {
479+
logger.warn("Transport has no request handler registered. Remember to call connect!");
480+
}
481+
482+
final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
483+
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));
484+
485+
var transportSession = this.activeSession.get();
486+
487+
if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
488+
throw new McpTransportSessionClosedException();
489+
}
490+
470491
logger.debug("Sending message {}", sentMessage);
471492

472493
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
473-
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
474494

475495
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
476496
String jsonBody = this.toString(sentMessage);
@@ -643,22 +663,26 @@ else if (statusCode == BAD_REQUEST) {
643663
new RuntimeException("Failed to send message: " + responseEvent));
644664
})
645665
.retryWhen(authorizationErrorRetrySpec())
646-
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
666+
.flatMap(jsonRpcMessage -> requestHandler.apply(Mono.just(jsonRpcMessage)))
647667
.onErrorMap(CompletionException.class, t -> t.getCause())
648-
.onErrorComplete(t -> {
649-
// handle the error first
650-
this.handleException(t);
651-
// inform the caller of sendMessage
652-
deliveredSink.error(t);
653-
return true;
654-
})
655668
.doFinally(s -> {
656669
logger.debug("SendMessage finally: {}", s);
657670
Disposable ref = disposableRef.getAndSet(null);
658671
if (ref != null) {
659672
transportSession.removeConnection(ref);
660673
}
661-
})).contextWrite(deliveredSink.contextView()).subscribe();
674+
})).onErrorComplete(t -> {
675+
// handle the error first
676+
try {
677+
this.handleException(t);
678+
}
679+
catch (Exception e) {
680+
logger.error("Error handling exception {}", t.getMessage(), e);
681+
}
682+
// inform the caller of sendMessage
683+
deliveredSink.error(t);
684+
return true;
685+
}).contextWrite(deliveredSink.contextView()).subscribe();
662686

663687
disposableRef.set(connection);
664688
transportSession.addConnection(connection);

mcp-core/src/main/java/io/modelcontextprotocol/spec/ClosedMcpTransportSession.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,41 @@
66
import java.util.Optional;
77

88
import org.reactivestreams.Publisher;
9+
import reactor.core.Disposable;
910
import reactor.core.publisher.Mono;
1011
import reactor.util.annotation.Nullable;
1112

1213
/**
13-
* Represents a closed MCP session, which may not be reused. All calls will throw a
14-
* {@link McpTransportSessionClosedException}.
14+
* Represents a closed MCP session, which may not be reused.
1515
*
16-
* @param <CONNECTION> the resource representing the connection that the transport
17-
* manages.
1816
* @author Daniel Garnier-Moiroux
17+
* @author Dariusz Jędrzejczyk
1918
*/
20-
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {
19+
public final class ClosedMcpTransportSession implements McpTransportSession<Disposable> {
2120

22-
private final String sessionId;
21+
public static final ClosedMcpTransportSession INSTANCE = new ClosedMcpTransportSession();
2322

24-
public ClosedMcpTransportSession(@Nullable String sessionId) {
25-
this.sessionId = sessionId;
23+
private ClosedMcpTransportSession() {
2624
}
2725

2826
@Override
2927
public Optional<String> sessionId() {
30-
throw new McpTransportSessionClosedException(sessionId);
28+
return Optional.empty();
3129
}
3230

3331
@Override
3432
public boolean markInitialized(String sessionId) {
35-
throw new McpTransportSessionClosedException(sessionId);
33+
throw new IllegalStateException("MCP Session is already closed");
3634
}
3735

3836
@Override
39-
public void addConnection(CONNECTION connection) {
40-
throw new McpTransportSessionClosedException(sessionId);
37+
public void addConnection(Disposable connection) {
38+
throw new IllegalStateException("MCP Session is already closed");
4139
}
4240

4341
@Override
44-
public void removeConnection(CONNECTION connection) {
45-
throw new McpTransportSessionClosedException(sessionId);
42+
public void removeConnection(Disposable connection) {
43+
throw new IllegalStateException("MCP Session is already closed");
4644
}
4745

4846
@Override

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
119119
this.requestHandlers.putAll(requestHandlers);
120120
this.notificationHandlers.putAll(notificationHandlers);
121121

122-
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
122+
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(ignored -> {
123+
}, error -> logger.warn("Client failed during connect", error));
123124
}
124125

125126
private void dismissPendingResponses() {
@@ -160,7 +161,15 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
160161
var errorResponse = McpSchema.JSONRPCResponse.error(request.id(), jsonRpcError);
161162
return Mono.just(errorResponse);
162163
}).flatMap(this.transport::sendMessage).onErrorComplete(t -> {
163-
logger.warn("Issue sending response to the client, ", t);
164+
if (t instanceof McpTransportSessionClosedException) {
165+
logger.debug("Can't send response to request {} when the transport is closed", request.id());
166+
}
167+
else if (McpTransport.isPeerClosed(t)) {
168+
logger.debug("Can't send response to request {}: connection closed by peer", request.id(), t);
169+
}
170+
else {
171+
logger.warn("Failed to send response to the server", t);
172+
}
164173
return true;
165174
}).subscribe();
166175
}

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransport.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
1010
import io.modelcontextprotocol.json.TypeRef;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1113
import reactor.core.publisher.Mono;
1214

1315
/**
@@ -39,6 +41,8 @@
3941
*/
4042
public interface McpTransport {
4143

44+
Logger logger = LoggerFactory.getLogger(McpTransport.class);
45+
4246
/**
4347
* Closes the transport connection and releases any associated resources.
4448
*
@@ -48,7 +52,24 @@ public interface McpTransport {
4852
* </p>
4953
*/
5054
default void close() {
51-
this.closeGracefully().subscribe();
55+
this.closeGracefully().subscribe(ignored -> {
56+
}, error -> {
57+
if (isPeerClosed(error)) {
58+
logger.debug("Error during asynchronous close", error);
59+
}
60+
else {
61+
logger.warn("Error during asynchronous close", error);
62+
}
63+
});
64+
}
65+
66+
static boolean isPeerClosed(Throwable t) {
67+
for (Throwable c = t; c != null; c = c.getCause()) {
68+
if (c instanceof java.io.EOFException) {
69+
return true;
70+
}
71+
}
72+
return false;
5273
}
5374

5475
/**

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionClosedException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@
1313
* @see ClosedMcpTransportSession
1414
* @author Daniel Garnier-Moiroux
1515
*/
16+
1617
public class McpTransportSessionClosedException extends RuntimeException {
1718

19+
public McpTransportSessionClosedException() {
20+
super("Transport has already been closed.");
21+
}
22+
23+
@Deprecated(forRemoval = true)
1824
public McpTransportSessionClosedException(@Nullable String sessionId) {
1925
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
2026
: "MCP session has been closed");

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ void testRootsListChanged() {
464464
void testInitializeWithRootsListProviders() {
465465
withClient(createMcpTransport(),
466466
builder -> builder.roots(Root.builder("file:///test/path").name("test-root").build()), client -> {
467-
StepVerifier.create(client.initialize().then(client.closeGracefully())).verifyComplete();
467+
StepVerifier.create(client.initialize()).expectNextCount(1).verifyComplete();
468468
});
469469
}
470470

@@ -728,8 +728,6 @@ void testLoggingConsumer() {
728728
builder -> builder.loggingConsumer(notification -> Mono.fromRunnable(() -> logReceived.set(true))),
729729
client -> {
730730
StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete();
731-
StepVerifier.create(client.closeGracefully()).verifyComplete();
732-
733731
});
734732

735733
}

mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
99
import io.modelcontextprotocol.common.McpTransportContext;
1010
import io.modelcontextprotocol.spec.McpSchema;
11+
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
1112
import io.modelcontextprotocol.spec.ProtocolVersions;
1213
import java.net.URI;
1314
import java.net.URISyntaxException;
1415
import java.util.Map;
1516
import java.util.function.Consumer;
17+
import java.util.function.Function;
18+
1619
import org.junit.jupiter.api.AfterAll;
1720
import org.junit.jupiter.api.BeforeAll;
1821
import org.junit.jupiter.api.Test;
@@ -139,13 +142,14 @@ void testCloseUninitialized() {
139142
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.METHOD_INITIALIZE, "test-id", initializeRequest);
140143

141144
StepVerifier.create(transport.sendMessage(testMessage))
142-
.expectErrorMessage("MCP session has been closed")
145+
.expectErrorMessage("Transport has already been closed.")
143146
.verify();
144147
}
145148

146149
@Test
147150
void testCloseInitialized() {
148151
var transport = HttpClientStreamableHttpTransport.builder(host).build();
152+
transport.connect(Function.identity()).block();
149153

150154
var initializeRequest = McpSchema.InitializeRequest
151155
.builder(ProtocolVersions.MCP_2025_11_25, McpSchema.ClientCapabilities.builder().roots(true).build(),
@@ -157,7 +161,8 @@ void testCloseInitialized() {
157161
StepVerifier.create(transport.closeGracefully()).verifyComplete();
158162

159163
StepVerifier.create(transport.sendMessage(testMessage))
160-
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
164+
.expectErrorMatches(err -> err instanceof McpTransportSessionClosedException
165+
&& err.getMessage().contains("Transport has already been closed"))
161166
.verify();
162167
}
163168

mcp-test/src/test/java/io/modelcontextprotocol/server/transport/ServerTransportSecurityIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ void setUp() {
8181

8282
@AfterEach
8383
void tearDown() {
84+
requestCustomizer.reset();
8485
mcpClient.close();
8586
}
8687

0 commit comments

Comments
 (0)