Skip to content

Commit bdf3811

Browse files
committed
fix: add related-task metadata to task-related messages
1 parent efc683e commit bdf3811

File tree

3 files changed

+321
-6
lines changed

3 files changed

+321
-6
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ private RequestHandler<McpSchema.Result> samplingCreateMessageHandler() {
662662
}
663663

664664
// Non-task-augmented request - execute directly
665-
return this.samplingHandler.apply(request).map(result -> (McpSchema.Result) result);
665+
return this.samplingHandler.apply(request).map(result -> processClientResult(request.meta(), result));
666666
};
667667
}
668668

@@ -682,10 +682,59 @@ private RequestHandler<McpSchema.Result> elicitationCreateHandler() {
682682
}
683683

684684
// Non-task-augmented request - execute directly
685-
return this.elicitationHandler.apply(request).map(result -> (McpSchema.Result) result);
685+
return this.elicitationHandler.apply(request).map(result -> processClientResult(request.meta(), result));
686686
};
687687
}
688688

689+
/**
690+
* Processes a client result before returning it to the server. Echoes related-task
691+
* metadata from the request to the response, which is necessary for the server to
692+
* associate elicitation/sampling responses with their originating task during
693+
* side-channeling.
694+
* @param requestMeta the request's _meta field
695+
* @param result the handler's result
696+
* @return the processed result with related-task metadata echoed (if present in
697+
* request)
698+
*/
699+
private McpSchema.Result processClientResult(Map<String, Object> requestMeta, McpSchema.Result result) {
700+
if (requestMeta == null || !requestMeta.containsKey(McpSchema.RELATED_TASK_META_KEY)) {
701+
return result;
702+
}
703+
704+
Object relatedTask = requestMeta.get(McpSchema.RELATED_TASK_META_KEY);
705+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(relatedTask, result.meta());
706+
707+
// Client-side task payloads are ElicitResult or CreateMessageResult
708+
// (per ClientTaskPayloadResult sealed interface)
709+
if (result instanceof McpSchema.ElicitResult elicitResult) {
710+
return new McpSchema.ElicitResult(elicitResult.action(), elicitResult.content(), newMeta);
711+
}
712+
else if (result instanceof McpSchema.CreateMessageResult messageResult) {
713+
return new McpSchema.CreateMessageResult(messageResult.role(), messageResult.content(),
714+
messageResult.model(), messageResult.stopReason(), newMeta);
715+
}
716+
717+
// For other result types, return as-is (shouldn't happen for client-side task
718+
// payloads)
719+
return result;
720+
}
721+
722+
/**
723+
* Merges related-task metadata with existing metadata.
724+
* @param relatedTask the related-task object to include
725+
* @param existingMeta the existing metadata (may be null)
726+
* @return a new map containing both the related-task metadata and any existing
727+
* metadata
728+
*/
729+
private Map<String, Object> mergeRelatedTaskMetadata(Object relatedTask, Map<String, Object> existingMeta) {
730+
Map<String, Object> newMeta = new HashMap<>();
731+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, relatedTask);
732+
if (existingMeta != null) {
733+
newMeta.putAll(existingMeta);
734+
}
735+
return newMeta;
736+
}
737+
689738
// --------------------------
690739
// Client-Side Task Hosting
691740
// --------------------------

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,7 +1638,7 @@ private Mono<Void> processMessage(McpAsyncServerExchange exchange, QueuedMessage
16381638

16391639
// Handle Notification messages (no response expected)
16401640
if (msg instanceof QueuedMessage.Notification notif) {
1641-
return sendNotificationToClient(exchange, notif);
1641+
return sendNotificationToClient(exchange, notif, taskId);
16421642
}
16431643

16441644
// Response messages should never be returned by dequeue() - but handle gracefully
@@ -1685,8 +1685,58 @@ private TypeRef<? extends McpSchema.Result> getResultTypeRef(String method) {
16851685
/**
16861686
* Sends a notification to the client without waiting for a response.
16871687
*/
1688-
private Mono<Void> sendNotificationToClient(McpAsyncServerExchange exchange, QueuedMessage.Notification notif) {
1689-
return exchange.getSession().sendNotification(notif.method(), notif.notification());
1688+
private Mono<Void> sendNotificationToClient(McpAsyncServerExchange exchange, QueuedMessage.Notification notif,
1689+
String taskId) {
1690+
McpSchema.Notification notification = addRelatedTaskMetadataToNotification(taskId, notif.notification());
1691+
return exchange.getSession().sendNotification(notif.method(), notification);
1692+
}
1693+
1694+
/**
1695+
* Adds related-task metadata to a notification. Task status notifications are
1696+
* excluded as they already contain the taskId in their params.
1697+
* @param taskId the task ID to include in the metadata
1698+
* @param notification the notification to augment
1699+
* @return the notification with related-task metadata added
1700+
*/
1701+
private McpSchema.Notification addRelatedTaskMetadataToNotification(String taskId,
1702+
McpSchema.Notification notification) {
1703+
// Handle all Notification subtypes (sealed interface guarantees exhaustiveness)
1704+
if (notification instanceof McpSchema.TaskStatusNotification tsn) {
1705+
// Already has taskId in params - spec says SHOULD NOT include metadata
1706+
return tsn;
1707+
}
1708+
else if (notification instanceof McpSchema.ProgressNotification pn) {
1709+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, pn.meta());
1710+
return new McpSchema.ProgressNotification(pn.progressToken(), pn.progress(), pn.total(), pn.message(),
1711+
newMeta);
1712+
}
1713+
else if (notification instanceof McpSchema.LoggingMessageNotification ln) {
1714+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, ln.meta());
1715+
return new McpSchema.LoggingMessageNotification(ln.level(), ln.logger(), ln.data(), newMeta);
1716+
}
1717+
else if (notification instanceof McpSchema.ResourcesUpdatedNotification rn) {
1718+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, rn.meta());
1719+
return new McpSchema.ResourcesUpdatedNotification(rn.uri(), newMeta);
1720+
}
1721+
1722+
// This should never happen due to sealed interface, but satisfies compiler
1723+
throw new IllegalStateException("Unexpected notification type: " + notification.getClass().getName());
1724+
}
1725+
1726+
/**
1727+
* Merges related-task metadata with existing metadata.
1728+
* @param taskId the task ID to include
1729+
* @param existingMeta the existing metadata (may be null)
1730+
* @return a new map containing both the related-task metadata and any existing
1731+
* metadata
1732+
*/
1733+
private Map<String, Object> mergeRelatedTaskMetadata(String taskId, Map<String, Object> existingMeta) {
1734+
Map<String, Object> newMeta = new HashMap<>();
1735+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, Map.of("taskId", taskId));
1736+
if (existingMeta != null) {
1737+
newMeta.putAll(existingMeta);
1738+
}
1739+
return newMeta;
16901740
}
16911741

16921742
/**
@@ -1724,13 +1774,33 @@ private Mono<McpSchema.Result> pollAndProcessUntilTerminal(McpAsyncServerExchang
17241774
@SuppressWarnings("unchecked")
17251775
private Mono<McpSchema.Result> fetchTaskResult(String taskId, String sessionId) {
17261776
return this.taskStore.getTaskResult(taskId, sessionId)
1727-
.map(result -> (McpSchema.Result) result)
1777+
.map(result -> addRelatedTaskMetadata(taskId, (McpSchema.Result) result))
17281778
.switchIfEmpty(Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
17291779
.message("Task result not available")
17301780
.data("Task ID: " + taskId)
17311781
.build()));
17321782
}
17331783

1784+
/**
1785+
* Adds the related-task metadata to a server task result. The tasks/result operation
1786+
* MUST include this metadata in its response, as the result structure itself does not
1787+
* contain the task ID.
1788+
* @param taskId the task ID to include in the metadata
1789+
* @param result the result to add metadata to
1790+
* @return the result with related-task metadata added
1791+
*/
1792+
private McpSchema.Result addRelatedTaskMetadata(String taskId, McpSchema.Result result) {
1793+
// Server-side tasks only produce CallToolResult (per ServerTaskPayloadResult
1794+
// sealed interface)
1795+
if (result instanceof McpSchema.CallToolResult ctr) {
1796+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, ctr.meta());
1797+
return new McpSchema.CallToolResult(ctr.content(), ctr.isError(), ctr.structuredContent(), newMeta);
1798+
}
1799+
1800+
// For non-task results (e.g., direct tool calls), return as-is
1801+
return result;
1802+
}
1803+
17341804
/**
17351805
* Returns the effective automatic polling timeout, using the configured value or the
17361806
* default if not configured.

mcp-test/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,202 @@ void testElicitationCreateRequestHandlingWithNullHandler() {
535535
.hasMessage("Elicitation handler must not be null when client capabilities include elicitation");
536536
}
537537

538+
@Test
539+
void testElicitationResponseIncludesRelatedTaskMetadata() {
540+
MockMcpClientTransport transport = initializationEnabledTransport();
541+
542+
// Create a test elicitation handler that returns a simple response
543+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
544+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), null));
545+
546+
// Create client with elicitation capability and handler
547+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
548+
.capabilities(ClientCapabilities.builder().elicitation().build())
549+
.elicitation(elicitationHandler)
550+
.build();
551+
552+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
553+
554+
// Create elicitation request WITH related-task metadata (simulating
555+
// side-channeling)
556+
String taskId = "test-task-123";
557+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
558+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
559+
560+
var elicitRequest = new McpSchema.ElicitRequest("What is your favorite number?",
561+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))), null, requestMeta);
562+
563+
// Simulate incoming request
564+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
565+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
566+
transport.simulateIncomingMessage(request);
567+
568+
// Verify response
569+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
570+
assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
571+
572+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
573+
assertThat(response.id()).isEqualTo("test-id");
574+
assertThat(response.error()).isNull();
575+
576+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
577+
});
578+
assertThat(result).isNotNull();
579+
assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
580+
assertThat(result.content()).isEqualTo(Map.of("value", "42"));
581+
582+
// Verify related-task metadata was echoed back
583+
assertThat(result.meta()).isNotNull();
584+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
585+
@SuppressWarnings("unchecked")
586+
Map<String, Object> echoedRelatedTask = (Map<String, Object>) result.meta()
587+
.get(McpSchema.RELATED_TASK_META_KEY);
588+
assertThat(echoedRelatedTask.get("taskId")).isEqualTo(taskId);
589+
590+
asyncMcpClient.closeGracefully();
591+
}
592+
593+
@Test
594+
void testElicitationResponseWithoutRelatedTaskMetadata() {
595+
MockMcpClientTransport transport = initializationEnabledTransport();
596+
597+
// Create a test elicitation handler that returns a response with custom meta
598+
Map<String, Object> handlerMeta = Map.of("custom-key", "custom-value");
599+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
600+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), handlerMeta));
601+
602+
// Create client with elicitation capability and handler
603+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
604+
.capabilities(ClientCapabilities.builder().elicitation().build())
605+
.elicitation(elicitationHandler)
606+
.build();
607+
608+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
609+
610+
// Create elicitation request WITHOUT related-task metadata
611+
var elicitRequest = new McpSchema.ElicitRequest("What is your favorite number?",
612+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))));
613+
614+
// Simulate incoming request
615+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
616+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
617+
transport.simulateIncomingMessage(request);
618+
619+
// Verify response
620+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
621+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
622+
623+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
624+
});
625+
assertThat(result).isNotNull();
626+
627+
// Verify handler's meta is preserved and no related-task was added
628+
assertThat(result.meta()).isEqualTo(handlerMeta);
629+
assertThat(result.meta()).doesNotContainKey(McpSchema.RELATED_TASK_META_KEY);
630+
631+
asyncMcpClient.closeGracefully();
632+
}
633+
634+
@Test
635+
void testSamplingResponseIncludesRelatedTaskMetadata() {
636+
MockMcpClientTransport transport = initializationEnabledTransport();
637+
638+
// Create a test sampling handler that returns a simple response
639+
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler = request -> Mono
640+
.just(new McpSchema.CreateMessageResult(McpSchema.Role.ASSISTANT, new McpSchema.TextContent("Response"),
641+
"test-model", McpSchema.CreateMessageResult.StopReason.END_TURN, null));
642+
643+
// Create client with sampling capability and handler
644+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
645+
.capabilities(ClientCapabilities.builder().sampling().build())
646+
.sampling(samplingHandler)
647+
.build();
648+
649+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
650+
651+
// Create sampling request WITH related-task metadata (simulating side-channeling)
652+
String taskId = "test-task-456";
653+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
654+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
655+
656+
var messageRequest = new McpSchema.CreateMessageRequest(
657+
List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))),
658+
null, "Test system prompt", McpSchema.CreateMessageRequest.ContextInclusionStrategy.NONE, 0.7, 100,
659+
null, null, null, requestMeta);
660+
661+
// Simulate incoming request
662+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
663+
McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, "test-id", messageRequest);
664+
transport.simulateIncomingMessage(request);
665+
666+
// Verify response
667+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
668+
assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
669+
670+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
671+
assertThat(response.id()).isEqualTo("test-id");
672+
assertThat(response.error()).isNull();
673+
674+
McpSchema.CreateMessageResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
675+
});
676+
assertThat(result).isNotNull();
677+
assertThat(result.role()).isEqualTo(McpSchema.Role.ASSISTANT);
678+
679+
// Verify related-task metadata was echoed back
680+
assertThat(result.meta()).isNotNull();
681+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
682+
@SuppressWarnings("unchecked")
683+
Map<String, Object> echoedRelatedTask = (Map<String, Object>) result.meta()
684+
.get(McpSchema.RELATED_TASK_META_KEY);
685+
assertThat(echoedRelatedTask.get("taskId")).isEqualTo(taskId);
686+
687+
asyncMcpClient.closeGracefully();
688+
}
689+
690+
@Test
691+
void testElicitationResponseMergesHandlerMetaWithRelatedTask() {
692+
MockMcpClientTransport transport = initializationEnabledTransport();
693+
694+
// Create a test elicitation handler that returns a response with custom meta
695+
Map<String, Object> handlerMeta = Map.of("custom-key", "custom-value");
696+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
697+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), handlerMeta));
698+
699+
// Create client with elicitation capability and handler
700+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
701+
.capabilities(ClientCapabilities.builder().elicitation().build())
702+
.elicitation(elicitationHandler)
703+
.build();
704+
705+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
706+
707+
// Create elicitation request WITH related-task metadata
708+
String taskId = "test-task-789";
709+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
710+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
711+
712+
var elicitRequest = new McpSchema.ElicitRequest("Test?",
713+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))), null, requestMeta);
714+
715+
// Simulate incoming request
716+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
717+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
718+
transport.simulateIncomingMessage(request);
719+
720+
// Verify response
721+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) transport.getLastSentMessage();
722+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
723+
});
724+
725+
// Verify both handler's meta AND related-task are present (merged)
726+
assertThat(result.meta()).isNotNull();
727+
assertThat(result.meta()).containsKey("custom-key");
728+
assertThat(result.meta().get("custom-key")).isEqualTo("custom-value");
729+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
730+
731+
asyncMcpClient.closeGracefully();
732+
}
733+
538734
@Test
539735
void testPingMessageRequestHandling() {
540736
MockMcpClientTransport transport = initializationEnabledTransport();

0 commit comments

Comments
 (0)