Skip to content

Commit 29983fd

Browse files
committed
fix: propagate SSE stream errors to messageEndpointSink
When the SSE stream encounters an error before the endpoint event is received, the messageEndpointSink is never signaled. This causes sendMessage() to hang indefinitely since Sinks.One.asMono() has no built-in timeout. This commit propagates errors to messageEndpointSink in two places: 1. onErrorComplete handler: propagates the actual error 2. doFinally handler: emits CancellationException as a safety net for cases where the stream completes without going through onErrorComplete Since Sinks.One only accepts a single emission, duplicate calls to tryEmitError() are safely ignored when the sink has already been completed (either with a value or an error). Closes #323 Signed-off-by: gyeo009 <gyeo009@users.noreply.github.com>
1 parent 8549e36 commit 29983fd

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.net.http.HttpResponse;
1212
import java.time.Duration;
1313
import java.util.List;
14+
import java.util.concurrent.CancellationException;
1415
import java.util.concurrent.CompletableFuture;
1516
import java.util.concurrent.atomic.AtomicReference;
1617
import java.util.function.Consumer;
@@ -399,9 +400,13 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
399400
logger.warn("SSE stream observed an error", t);
400401
sink.error(t);
401402
}
403+
this.messageEndpointSink.tryEmitError(t);
402404
return true;
403405
})
404406
.doFinally(s -> {
407+
this.messageEndpointSink
408+
.tryEmitError(new CancellationException("SSE stream ended before receiving the endpoint event"));
409+
405410
Disposable ref = this.sseSubscription.getAndSet(null);
406411
if (ref != null && !ref.isDisposed()) {
407412
ref.dispose();

mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,21 @@ void testAsyncRequestCustomizer() {
477477
customizedTransport.closeGracefully().block();
478478
}
479479

480+
@Test
481+
void testSendMessageFailsWhenSseConnectionFails() {
482+
HttpClientSseClientTransport failingTransport = HttpClientSseClientTransport.builder("http://non-existent-host")
483+
.connectTimeout(Duration.ofSeconds(2))
484+
.build();
485+
486+
failingTransport.connect(Function.identity()).subscribe();
487+
488+
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
489+
Map.of("key", "value"));
490+
491+
// sendMessage() should fail with an error rather than hanging forever
492+
StepVerifier.create(failingTransport.sendMessage(testMessage)).expectError().verify(Duration.ofSeconds(10));
493+
494+
failingTransport.closeGracefully().block();
495+
}
496+
480497
}

0 commit comments

Comments
 (0)