Skip to content

Commit 23be10c

Browse files
committed
fix: propagate SSE stream errors to messageEndpointSink
When the SSE stream encounters an error before the endpoint event is received, the messageEndpointSink is never signaled. This causes sendMessage() to hang indefinitely since Sinks.One.asMono() has no built-in timeout. This commit propagates errors to messageEndpointSink in two places: 1. onErrorComplete handler: propagates the actual error 2. doFinally handler: emits CancellationException as a safety net for cases where the stream completes without going through onErrorComplete Since Sinks.One only accepts a single emission, duplicate calls to tryEmitError() are safely ignored when the sink has already been completed (either with a value or an error). Closes #323 Signed-off-by: gyeo009 <gyeo009@users.noreply.github.com>
1 parent 8549e36 commit 23be10c

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.net.http.HttpResponse;
1212
import java.time.Duration;
1313
import java.util.List;
14+
import java.util.concurrent.CancellationException;
1415
import java.util.concurrent.CompletableFuture;
1516
import java.util.concurrent.atomic.AtomicReference;
1617
import java.util.function.Consumer;
@@ -399,9 +400,13 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
399400
logger.warn("SSE stream observed an error", t);
400401
sink.error(t);
401402
}
403+
this.messageEndpointSink.tryEmitError(t);
402404
return true;
403405
})
404406
.doFinally(s -> {
407+
this.messageEndpointSink
408+
.tryEmitError(new CancellationException("SSE stream completed without endpoint event"));
409+
405410
Disposable ref = this.sseSubscription.getAndSet(null);
406411
if (ref != null && !ref.isDisposed()) {
407412
ref.dispose();

mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,29 @@ void testAsyncRequestCustomizer() {
477477
customizedTransport.closeGracefully().block();
478478
}
479479

480+
@Test
481+
void testSendMessageFailsWhenSseConnectionFails() {
482+
// Create a transport that connects to a non-existent host.
483+
// The SSE connection will fail, and sendMessage() should propagate
484+
// the error instead of hanging indefinitely.
485+
// See: https://github.com/modelcontextprotocol/java-sdk/pull/323
486+
HttpClientSseClientTransport failingTransport = HttpClientSseClientTransport.builder("http://non-existent-host")
487+
.connectTimeout(Duration.ofSeconds(2))
488+
.build();
489+
490+
// Attempt to connect - this will fail due to the non-existent host
491+
failingTransport.connect(Function.identity()).subscribe();
492+
493+
// sendMessage() should fail with an error rather than hanging forever.
494+
// Without the fix, messageEndpointSink never receives a signal and
495+
// asMono() blocks indefinitely, causing this test to time out.
496+
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
497+
Map.of("key", "value"));
498+
499+
StepVerifier.create(failingTransport.sendMessage(testMessage)).expectError().verify(Duration.ofSeconds(10));
500+
501+
// Clean up
502+
failingTransport.closeGracefully().block();
503+
}
504+
480505
}

0 commit comments

Comments
 (0)