Skip to content

Commit 6a5eb43

Browse files
committed
Improve logging with sending requests and fixed the hookOnSubscribe for the AggregateSubscriber to request unbounded items
1 parent 713ee1a commit 6a5eb43

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -422,17 +422,23 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
422422
return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody));
423423
}).flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(responseEventSink -> {
424424

425-
// Create the async request with proper body subscriber selection
426-
Mono.fromFuture(this.httpClient
427-
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink))
428-
.whenComplete((response, throwable) -> {
429-
if (throwable != null) {
430-
responseEventSink.error(throwable);
431-
}
432-
else {
433-
logger.debug("SSE connection established successfully");
434-
}
435-
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
425+
// Create the async request with proper error handling and timeout
426+
// The key insight: the response body is consumed by the BodySubscriber
427+
// and flows through responseEventSink
428+
// The CompletableFuture<HttpResponse<Void>> completes when headers are
429+
// received, not when body is consumed
430+
Mono.fromFuture(() -> this.httpClient.sendAsync(requestBuilder.build(),
431+
this.toSendMessageBodySubscriber(responseEventSink)))
432+
.doOnSuccess(response -> {
433+
logger.debug("Success: " + response.statusCode());
434+
})
435+
.doOnError(throwable -> {
436+
logger.error("HTTP request failed with message {}", throwable.getMessage(), throwable);
437+
// Ensure the sink gets the error if it hasn't been completed yet
438+
responseEventSink.error(throwable);
439+
})
440+
.onErrorMap(CompletionException.class, Throwable::getCause)
441+
.subscribe();
436442

437443
})).flatMap(responseEvent -> {
438444
if (transportSession.markInitialized(
@@ -500,6 +506,11 @@ else if (contentType.contains(APPLICATION_JSON)) {
500506
return Mono.empty();
501507
}
502508

509+
if (!Utils.hasText(data)) {
510+
deliveredSink.success();
511+
return Mono.empty();
512+
}
513+
503514
try {
504515
return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
505516
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ else if (line.startsWith(":")) {
187187

188188
@Override
189189
protected void hookOnComplete() {
190-
if (this.eventBuilder.length() > 0) {
190+
if (!this.eventBuilder.isEmpty()) {
191191
String eventData = this.eventBuilder.toString();
192192
SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
193193
this.sink.next(new SseResponseEvent(responseInfo, sseEvent));
@@ -233,9 +233,24 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> si
233233

234234
@Override
235235
protected void hookOnSubscribe(Subscription subscription) {
236-
sink.onRequest(subscription::request);
236+
var contentLength = responseInfo.headers().firstValue("Content-Length");
237+
var useUnbounded = false;
238+
if (contentLength.isPresent()) {
239+
useUnbounded = Long.parseLong(contentLength.get()) > 0;
240+
}
241+
if (useUnbounded) {
242+
sink.onRequest(n -> {
243+
// Don't forward downstream requests directly - we manage our own
244+
// requests
245+
});
246+
247+
// Request unbounded items to consume the entire response body
248+
subscription.request(Long.MAX_VALUE);
249+
}
250+
else {
251+
sink.onRequest(subscription::request);
252+
}
237253

238-
// Register disposal callback to cancel subscription when Flux is disposed
239254
sink.onDispose(subscription::cancel);
240255
}
241256

0 commit comments

Comments
 (0)