Skip to content

Commit c946c6d

Browse files
committed
fix pendingresponse leak
1 parent 082444e commit c946c6d

1 file changed

Lines changed: 17 additions & 11 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,26 @@
44

55
package io.modelcontextprotocol.spec;
66

7-
import java.time.Duration;
8-
import java.util.Map;
9-
import java.util.concurrent.ConcurrentHashMap;
10-
import java.util.concurrent.atomic.AtomicInteger;
11-
import java.util.concurrent.atomic.AtomicLong;
12-
import java.util.concurrent.atomic.AtomicReference;
13-
147
import io.modelcontextprotocol.common.McpTransportContext;
8+
import io.modelcontextprotocol.json.TypeRef;
159
import io.modelcontextprotocol.server.McpAsyncServerExchange;
1610
import io.modelcontextprotocol.server.McpInitRequestHandler;
1711
import io.modelcontextprotocol.server.McpNotificationHandler;
1812
import io.modelcontextprotocol.server.McpRequestHandler;
19-
import io.modelcontextprotocol.json.TypeRef;
2013
import io.modelcontextprotocol.util.Assert;
2114
import org.slf4j.Logger;
2215
import org.slf4j.LoggerFactory;
2316
import reactor.core.publisher.Mono;
2417
import reactor.core.publisher.MonoSink;
2518
import reactor.core.publisher.Sinks;
2619

20+
import java.time.Duration;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
2727
/**
2828
* Represents a Model Context Protocol (MCP) session on the server side. It manages
2929
* bidirectional JSON-RPC communication with the client.
@@ -36,7 +36,9 @@ public class McpServerSession implements McpLoggableSession {
3636

3737
private final String id;
3838

39-
/** Duration to wait for request responses before timing out */
39+
/**
40+
* Duration to wait for request responses before timing out
41+
*/
4042
private final Duration requestTimeout;
4143

4244
private final AtomicLong requestCounter = new AtomicLong(0);
@@ -165,6 +167,8 @@ public <T> Mono<T> sendRequest(String method, Object requestParams, TypeRef<T> t
165167
this.pendingResponses.remove(requestId);
166168
sink.error(error);
167169
});
170+
// remove pending response when sink is disposed(e.g. timeout)
171+
sink.onDispose(() -> this.pendingResponses.remove(requestId));
168172
}).timeout(requestTimeout).handle((jsonRpcResponse, sink) -> {
169173
if (jsonRpcResponse.error() != null) {
170174
sink.error(new McpError(jsonRpcResponse.error()));
@@ -345,13 +349,15 @@ private MethodNotFoundError getMethodNotFoundError(String method) {
345349

346350
@Override
347351
public Mono<Void> closeGracefully() {
348-
// TODO: clear pendingResponses and emit errors?
352+
this.pendingResponses.values().forEach(sink -> sink.error(new RuntimeException("Session closed")));
353+
this.pendingResponses.clear();
349354
return this.transport.closeGracefully();
350355
}
351356

352357
@Override
353358
public void close() {
354-
// TODO: clear pendingResponses and emit errors?
359+
this.pendingResponses.values().forEach(sink -> sink.error(new RuntimeException("Session closed")));
360+
this.pendingResponses.clear();
355361
this.transport.close();
356362
}
357363

0 commit comments

Comments
 (0)