Skip to content

Commit a7abcab

Browse files
committed
fix: propagate SSE stream errors to messageEndpointSink
When the SSE stream encounters an error before the endpoint event is received, the messageEndpointSink is never signaled. This causes sendMessage() to hang indefinitely since Sinks.One.asMono() has no built-in timeout. This commit propagates errors to messageEndpointSink in two places: 1. onErrorComplete handler: propagates the actual error 2. doFinally handler: emits CancellationException as a safety net for cases where the stream completes without going through onErrorComplete Since Sinks.One only accepts a single emission, duplicate calls to tryEmitError() are safely ignored when the sink has already been completed (either with a value or an error). Closes #323 Signed-off-by: gyeo009 <gyeo009@users.noreply.github.com>
1 parent 8549e36 commit a7abcab

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.net.http.HttpResponse;
1212
import java.time.Duration;
1313
import java.util.List;
14+
import java.util.concurrent.CancellationException;
1415
import java.util.concurrent.CompletableFuture;
1516
import java.util.concurrent.atomic.AtomicReference;
1617
import java.util.function.Consumer;
@@ -399,9 +400,13 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
399400
logger.warn("SSE stream observed an error", t);
400401
sink.error(t);
401402
}
403+
this.messageEndpointSink.tryEmitError(t);
402404
return true;
403405
})
404406
.doFinally(s -> {
407+
this.messageEndpointSink
408+
.tryEmitError(new CancellationException("SSE stream ended before receiving the endpoint event"));
409+
405410
Disposable ref = this.sseSubscription.getAndSet(null);
406411
if (ref != null && !ref.isDisposed()) {
407412
ref.dispose();

mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,27 @@ void testRetryBehavior() {
244244
failingTransport.closeGracefully().block();
245245
}
246246

247+
@Test
248+
void testSendMessageFailsWhenSseConnectionFails() {
249+
// Create transport that connects to a non-existent host
250+
HttpClientSseClientTransport failingTransport = HttpClientSseClientTransport.builder("http://non-existent-host")
251+
.connectTimeout(Duration.ofSeconds(2))
252+
.build();
253+
254+
// Attempt to connect (will fail)
255+
failingTransport.connect(Function.identity()).subscribe();
256+
257+
// Create a test message
258+
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
259+
Map.of("key", "value"));
260+
261+
// Verify sendMessage fails with an error rather than hanging forever
262+
StepVerifier.create(failingTransport.sendMessage(testMessage)).expectError().verify(Duration.ofSeconds(10));
263+
264+
// Clean up
265+
failingTransport.closeGracefully().block();
266+
}
267+
247268
@Test
248269
void testMultipleMessageProcessing() {
249270
// Simulate receiving multiple messages in sequence

0 commit comments

Comments
 (0)