From 55876cbcf58f39915ac05b7e1609953b345581ba Mon Sep 17 00:00:00 2001 From: Dale Seo <5466341+DaleSeo@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:10:12 -0400 Subject: [PATCH 1/2] feat: add local feature for !Send tool handler support --- crates/rmcp-macros/Cargo.toml | 2 + crates/rmcp-macros/src/tool.rs | 17 ++- crates/rmcp/Cargo.toml | 1 + crates/rmcp/src/handler/server.rs | 108 ++++++++++-------- crates/rmcp/src/handler/server/prompt.rs | 74 ++++++------ .../rmcp/src/handler/server/router/prompt.rs | 26 ++--- crates/rmcp/src/handler/server/router/tool.rs | 39 +++---- .../handler/server/router/tool/tool_traits.rs | 25 ++-- crates/rmcp/src/handler/server/tool.rs | 70 +++++++----- crates/rmcp/src/service.rs | 99 +++++++++++++--- crates/rmcp/src/service/client.rs | 3 +- crates/rmcp/src/service/server.rs | 3 +- .../rmcp/tests/test_client_initialization.rs | 2 +- crates/rmcp/tests/test_close_connection.rs | 1 + crates/rmcp/tests/test_custom_headers.rs | 1 + crates/rmcp/tests/test_custom_request.rs | 1 + crates/rmcp/tests/test_logging.rs | 1 + crates/rmcp/tests/test_message_protocol.rs | 1 + crates/rmcp/tests/test_notification.rs | 1 + crates/rmcp/tests/test_progress_subscriber.rs | 1 + crates/rmcp/tests/test_prompt_macros.rs | 1 + crates/rmcp/tests/test_prompt_routers.rs | 1 + crates/rmcp/tests/test_sampling.rs | 1 + .../rmcp/tests/test_server_initialization.rs | 2 +- .../tests/test_task_support_validation.rs | 1 + crates/rmcp/tests/test_tool_macros.rs | 1 + crates/rmcp/tests/test_tool_routers.rs | 1 + crates/rmcp/tests/test_with_js.rs | 1 + crates/rmcp/tests/test_with_python.rs | 1 + 29 files changed, 306 insertions(+), 180 deletions(-) diff --git a/crates/rmcp-macros/Cargo.toml b/crates/rmcp-macros/Cargo.toml index b59929265..ef61b2952 100644 --- a/crates/rmcp-macros/Cargo.toml +++ b/crates/rmcp-macros/Cargo.toml @@ -23,4 +23,6 @@ serde_json = "1.0" darling = { version = "0.23" } [features] +local = [] + [dev-dependencies] diff --git a/crates/rmcp-macros/src/tool.rs b/crates/rmcp-macros/src/tool.rs index 56bf65a14..6fe1765a8 100644 --- a/crates/rmcp-macros/src/tool.rs +++ b/crates/rmcp-macros/src/tool.rs @@ -95,6 +95,9 @@ pub struct ToolAttribute { pub icons: Option, /// Optional metadata for the tool pub meta: Option, + /// When true, the generated future will not require `Send`. Useful for `!Send` handlers + /// (e.g. single-threaded database connections). Also enabled globally by the `local` crate feature. + pub local: bool, } #[derive(FromMeta, Debug, Default)] @@ -333,7 +336,9 @@ pub fn tool(attr: TokenStream, input: TokenStream) -> syn::Result { if fn_item.sig.asyncness.is_some() { // 1. remove asyncness from sig // 2. make return type: `std::pin::Pin + Send + '_>>` + // (omit `+ Send` when the `local` crate feature is active or `#[tool(local)]` is used) // 3. make body: { Box::pin(async move { #body }) } + let omit_send = cfg!(feature = "local") || attribute.local; let new_output = syn::parse2::({ let mut lt = quote! { 'static }; if let Some(receiver) = fn_item.sig.receiver() { @@ -347,10 +352,18 @@ pub fn tool(attr: TokenStream, input: TokenStream) -> syn::Result { } match &fn_item.sig.output { syn::ReturnType::Default => { - quote! { -> ::std::pin::Pin + Send + #lt>> } + if omit_send { + quote! { -> ::std::pin::Pin + #lt>> } + } else { + quote! { -> ::std::pin::Pin + Send + #lt>> } + } } syn::ReturnType::Type(_, ty) => { - quote! { -> ::std::pin::Pin + Send + #lt>> } + if omit_send { + quote! { -> ::std::pin::Pin + #lt>> } + } else { + quote! { -> ::std::pin::Pin + Send + #lt>> } + } } } })?; diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index 31e05acab..9a5557c72 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -78,6 +78,7 @@ chrono = { version = "0.4.38", default-features = false, features = [ [features] default = ["base64", "macros", "server"] +local = ["rmcp-macros?/local"] client = ["dep:tokio-stream"] server = ["transport-async-rw", "dep:schemars", "dep:pastey"] macros = ["dep:rmcp-macros", "dep:pastey"] diff --git a/crates/rmcp/src/handler/server.rs b/crates/rmcp/src/handler/server.rs index a7ae335b0..7f21f8d63 100644 --- a/crates/rmcp/src/handler/server.rs +++ b/crates/rmcp/src/handler/server.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use crate::{ error::ErrorData as McpError, model::{TaskSupport, *}, - service::{NotificationContext, RequestContext, RoleServer, Service, ServiceRole}, + service::{ + MaybeSend, MaybeSendFuture, NotificationContext, RequestContext, RoleServer, Service, + ServiceRole, + }, }; pub mod common; @@ -159,12 +162,16 @@ impl Service for H { } #[allow(unused_variables)] -pub trait ServerHandler: Sized + Send + Sync + 'static { +#[allow( + private_bounds, + reason = "MaybeSend is a sealed conditional Send + Sync alias" +)] +pub trait ServerHandler: Sized + MaybeSend + 'static { fn enqueue_task( &self, _request: CallToolRequestParams, _context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::internal_error( "Task processing not implemented".to_string(), None, @@ -173,7 +180,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { fn ping( &self, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(())) } // handle requests @@ -181,7 +188,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: InitializeRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { if context.peer.peer_info().is_none() { context.peer.set_peer_info(request); } @@ -191,49 +198,50 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: CompleteRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(CompleteResult::default())) } fn set_level( &self, request: SetLevelRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::method_not_found::())) } fn get_prompt( &self, request: GetPromptRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::method_not_found::())) } fn list_prompts( &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(ListPromptsResult::default())) } fn list_resources( &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(ListResourcesResult::default())) } fn list_resource_templates( &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ + { std::future::ready(Ok(ListResourceTemplatesResult::default())) } fn read_resource( &self, request: ReadResourceRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err( McpError::method_not_found::(), )) @@ -242,28 +250,28 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: SubscribeRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::method_not_found::())) } fn unsubscribe( &self, request: UnsubscribeRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::method_not_found::())) } fn call_tool( &self, request: CallToolRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::method_not_found::())) } fn list_tools( &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(ListToolsResult::default())) } /// Get a tool definition by name. @@ -277,7 +285,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: CustomRequest, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { let CustomRequest { method, .. } = request; let _ = context; std::future::ready(Err(McpError::new( @@ -291,34 +299,34 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, notification: CancelledNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_progress( &self, notification: ProgressNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_initialized( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { tracing::info!("client initialized"); std::future::ready(()) } fn on_roots_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_custom_notification( &self, notification: CustomNotification, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { let _ = (notification, context); std::future::ready(()) } @@ -331,7 +339,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err(McpError::method_not_found::())) } @@ -339,7 +347,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: GetTaskInfoParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { let _ = (request, context); std::future::ready(Err(McpError::method_not_found::())) } @@ -348,7 +356,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: GetTaskResultParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { let _ = (request, context); std::future::ready(Err(McpError::method_not_found::())) } @@ -357,7 +365,7 @@ pub trait ServerHandler: Sized + Send + Sync + 'static { &self, request: CancelTaskParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { let _ = (request, context); std::future::ready(Err(McpError::method_not_found::())) } @@ -370,14 +378,14 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: CallToolRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).enqueue_task(request, context) } fn ping( &self, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).ping(context) } @@ -385,7 +393,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: InitializeRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).initialize(request, context) } @@ -393,7 +401,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: CompleteRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).complete(request, context) } @@ -401,7 +409,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: SetLevelRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).set_level(request, context) } @@ -409,7 +417,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: GetPromptRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).get_prompt(request, context) } @@ -417,7 +425,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).list_prompts(request, context) } @@ -425,7 +433,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).list_resources(request, context) } @@ -433,7 +441,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ + ) -> impl Future> + MaybeSendFuture + '_ { (**self).list_resource_templates(request, context) } @@ -442,7 +450,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: ReadResourceRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).read_resource(request, context) } @@ -450,7 +458,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: SubscribeRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).subscribe(request, context) } @@ -458,7 +466,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: UnsubscribeRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).unsubscribe(request, context) } @@ -466,7 +474,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: CallToolRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).call_tool(request, context) } @@ -474,7 +482,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).list_tools(request, context) } @@ -486,7 +494,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: CustomRequest, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).on_custom_request(request, context) } @@ -494,7 +502,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, notification: CancelledNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_cancelled(notification, context) } @@ -502,21 +510,21 @@ macro_rules! impl_server_handler_for_wrapper { &self, notification: ProgressNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_progress(notification, context) } fn on_initialized( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_initialized(context) } fn on_roots_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_roots_list_changed(context) } @@ -524,7 +532,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, notification: CustomNotification, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_custom_notification(notification, context) } @@ -536,7 +544,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: Option, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).list_tasks(request, context) } @@ -544,7 +552,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: GetTaskInfoParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).get_task_info(request, context) } @@ -552,7 +560,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: GetTaskResultParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).get_task_result(request, context) } @@ -560,7 +568,7 @@ macro_rules! impl_server_handler_for_wrapper { &self, request: CancelTaskParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).cancel_task(request, context) } } diff --git a/crates/rmcp/src/handler/server/prompt.rs b/crates/rmcp/src/handler/server/prompt.rs index 826bee0df..ae98677c0 100644 --- a/crates/rmcp/src/handler/server/prompt.rs +++ b/crates/rmcp/src/handler/server/prompt.rs @@ -6,7 +6,8 @@ use std::{future::Future, marker::PhantomData}; -use futures::future::{BoxFuture, FutureExt}; +use futures::future::BoxFuture; +#[allow(unused_imports)] use serde::de::DeserializeOwned; use super::common::{AsRequestContext, FromContextPart}; @@ -15,7 +16,7 @@ use crate::{ RoleServer, handler::server::wrapper::Parameters, model::{GetPromptResult, PromptMessage}, - service::RequestContext, + service::{MaybeBoxFuture, MaybeSend, MaybeSendFuture, RequestContext}, }; /// Context for prompt retrieval operations @@ -57,14 +58,23 @@ pub trait GetPromptHandler { fn handle( self, context: PromptContext<'_, S>, - ) -> BoxFuture<'_, Result>; + ) -> MaybeBoxFuture<'_, Result>; } /// Type alias for dynamic prompt handlers +#[cfg(not(feature = "local"))] pub type DynGetPromptHandler = dyn for<'a> Fn(PromptContext<'a, S>) -> BoxFuture<'a, Result> + Send + Sync; +#[cfg(feature = "local")] +pub type DynGetPromptHandler = dyn for<'a> Fn( + PromptContext<'a, S>, +) -> futures::future::LocalBoxFuture< + 'a, + Result, +>; + /// Adapter type for async methods that return `Vec` pub struct AsyncMethodAdapter(PhantomData); @@ -191,31 +201,31 @@ macro_rules! impl_prompt_handler_for { impl<$($Tn,)* S, F, R> GetPromptHandler for F where $( - $Tn: for<'a> FromContextPart> + Send, + $Tn: for<'a> FromContextPart> + MaybeSendFuture, )* - F: FnOnce(&S, $($Tn,)*) -> BoxFuture<'_, R> + Send, - R: IntoGetPromptResult + Send + 'static, - S: Send + Sync + 'static, + F: FnOnce(&S, $($Tn,)*) -> MaybeBoxFuture<'_, R> + MaybeSendFuture, + R: IntoGetPromptResult + MaybeSendFuture + 'static, + S: MaybeSend + 'static, { #[allow(unused_variables, non_snake_case, unused_mut)] fn handle( self, mut context: PromptContext<'_, S>, - ) -> BoxFuture<'_, Result> + ) -> MaybeBoxFuture<'_, Result> { $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* let service = context.server; let fut = self(service, $($Tn,)*); - async move { + Box::pin(async move { let result = fut.await; result.into_get_prompt_result() - }.boxed() + }) } } @@ -224,28 +234,28 @@ macro_rules! impl_prompt_handler_for { impl<$($Tn,)* S, F, R> GetPromptHandler> for F where $( - $Tn: for<'a> FromContextPart> + Send, + $Tn: for<'a> FromContextPart> + MaybeSendFuture, )* - F: FnOnce(&S, $($Tn,)*) -> R + Send, - R: IntoGetPromptResult + Send, - S: Send + Sync, + F: FnOnce(&S, $($Tn,)*) -> R + MaybeSendFuture, + R: IntoGetPromptResult + MaybeSendFuture, + S: MaybeSend, { #[allow(unused_variables, non_snake_case, unused_mut)] fn handle( self, mut context: PromptContext<'_, S>, - ) -> BoxFuture<'_, Result> + ) -> MaybeBoxFuture<'_, Result> { $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* let service = context.server; let result = self(service, $($Tn,)*); - std::future::ready(result.into_get_prompt_result()).boxed() + Box::pin(std::future::ready(result.into_get_prompt_result())) } } @@ -254,25 +264,25 @@ macro_rules! impl_prompt_handler_for { impl<$($Tn,)* S, F, Fut, R> GetPromptHandler> for F where $( - $Tn: for<'a> FromContextPart> + Send + 'static, + $Tn: for<'a> FromContextPart> + MaybeSendFuture + 'static, )* - F: FnOnce($($Tn,)*) -> Fut + Send + 'static, - Fut: Future> + Send + 'static, - R: IntoGetPromptResult + Send + 'static, - S: Send + Sync + 'static, + F: FnOnce($($Tn,)*) -> Fut + MaybeSendFuture + 'static, + Fut: Future> + MaybeSendFuture + 'static, + R: IntoGetPromptResult + MaybeSendFuture + 'static, + S: MaybeSend + 'static, { #[allow(unused_variables, non_snake_case, unused_mut)] fn handle( self, mut context: PromptContext<'_, S>, - ) -> BoxFuture<'_, Result> + ) -> MaybeBoxFuture<'_, Result> { // Extract all parameters before moving into the async block $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* @@ -290,27 +300,27 @@ macro_rules! impl_prompt_handler_for { impl<$($Tn,)* S, F, R> GetPromptHandler> for F where $( - $Tn: for<'a> FromContextPart> + Send + 'static, + $Tn: for<'a> FromContextPart> + MaybeSendFuture + 'static, )* - F: FnOnce($($Tn,)*) -> Result + Send + 'static, - R: IntoGetPromptResult + Send + 'static, - S: Send + Sync, + F: FnOnce($($Tn,)*) -> Result + MaybeSendFuture + 'static, + R: IntoGetPromptResult + MaybeSendFuture + 'static, + S: MaybeSend, { #[allow(unused_variables, non_snake_case, unused_mut)] fn handle( self, mut context: PromptContext<'_, S>, - ) -> BoxFuture<'_, Result> + ) -> MaybeBoxFuture<'_, Result> { $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* let result = self($($Tn,)*); - std::future::ready(result.and_then(|r| r.into_get_prompt_result())).boxed() + Box::pin(std::future::ready(result.and_then(|r| r.into_get_prompt_result()))) } } diff --git a/crates/rmcp/src/handler/server/router/prompt.rs b/crates/rmcp/src/handler/server/router/prompt.rs index 6ea925a0e..b5ea4a47f 100644 --- a/crates/rmcp/src/handler/server/router/prompt.rs +++ b/crates/rmcp/src/handler/server/router/prompt.rs @@ -1,10 +1,9 @@ use std::{borrow::Cow, sync::Arc}; -use futures::future::BoxFuture; - use crate::{ handler::server::prompt::{DynGetPromptHandler, GetPromptHandler, PromptContext}, model::{GetPromptResult, Prompt}, + service::{MaybeBoxFuture, MaybeSend}, }; pub struct PromptRoute { @@ -32,10 +31,10 @@ impl Clone for PromptRoute { } } -impl PromptRoute { +impl PromptRoute { pub fn new(attr: impl Into, handler: H) -> Self where - H: GetPromptHandler + Send + Sync + Clone + 'static, + H: GetPromptHandler + MaybeSend + Clone + 'static, { Self { get: Arc::new(move |context: PromptContext| { @@ -50,9 +49,8 @@ impl PromptRoute { where H: for<'a> Fn( PromptContext<'a, S>, - ) -> BoxFuture<'a, Result> - + Send - + Sync + ) -> MaybeBoxFuture<'a, Result> + + MaybeSend + 'static, { Self { @@ -72,9 +70,9 @@ pub trait IntoPromptRoute { impl IntoPromptRoute for (P, H) where - S: Send + Sync + 'static, + S: MaybeSend + 'static, A: 'static, - H: GetPromptHandler + Send + Sync + Clone + 'static, + H: GetPromptHandler + MaybeSend + Clone + 'static, P: Into, { fn into_prompt_route(self) -> PromptRoute { @@ -84,7 +82,7 @@ where impl IntoPromptRoute for PromptRoute where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { fn into_prompt_route(self) -> PromptRoute { self @@ -96,7 +94,7 @@ pub struct PromptAttrGenerateFunctionAdapter; impl IntoPromptRoute for F where - S: Send + Sync + 'static, + S: MaybeSend + 'static, F: Fn() -> PromptRoute, { fn into_prompt_route(self) -> PromptRoute { @@ -137,7 +135,7 @@ impl IntoIterator for PromptRouter { impl PromptRouter where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { pub fn new() -> Self { Self { @@ -195,7 +193,7 @@ where impl std::ops::Add> for PromptRouter where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { type Output = Self; @@ -207,7 +205,7 @@ where impl std::ops::AddAssign> for PromptRouter where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { fn add_assign(&mut self, other: PromptRouter) { self.merge(other); diff --git a/crates/rmcp/src/handler/server/router/tool.rs b/crates/rmcp/src/handler/server/router/tool.rs index 5c1941bd0..42c582c40 100644 --- a/crates/rmcp/src/handler/server/router/tool.rs +++ b/crates/rmcp/src/handler/server/router/tool.rs @@ -124,7 +124,6 @@ mod tool_traits; use std::{borrow::Cow, sync::Arc}; -use futures::{FutureExt, future::BoxFuture}; use schemars::JsonSchema; pub use tool_traits::{AsyncTool, SyncTool, ToolBase}; @@ -134,6 +133,7 @@ use crate::{ tool_name_validation::validate_and_warn_tool_name, }, model::{CallToolResult, Tool, ToolAnnotations}, + service::{MaybeBoxFuture, MaybeSend}, }; pub struct ToolRoute { @@ -161,15 +161,15 @@ impl Clone for ToolRoute { } } -impl ToolRoute { +impl ToolRoute { pub fn new(attr: impl Into, call: C) -> Self where - C: CallToolHandler + Send + Sync + Clone + 'static, + C: CallToolHandler + MaybeSend + Clone + 'static, { Self { call: Arc::new(move |context: ToolCallContext| { let call = call.clone(); - context.invoke(call).boxed() + context.invoke(call) }), attr: attr.into(), } @@ -178,9 +178,8 @@ impl ToolRoute { where C: for<'a> Fn( ToolCallContext<'a, S>, - ) -> BoxFuture<'a, Result> - + Send - + Sync + ) -> MaybeBoxFuture<'a, Result> + + MaybeSend + 'static, { Self { @@ -199,8 +198,8 @@ pub trait IntoToolRoute { impl IntoToolRoute for (T, C) where - S: Send + Sync + 'static, - C: CallToolHandler + Send + Sync + Clone + 'static, + S: MaybeSend + 'static, + C: CallToolHandler + MaybeSend + Clone + 'static, T: Into, { fn into_tool_route(self) -> ToolRoute { @@ -210,7 +209,7 @@ where impl IntoToolRoute for ToolRoute where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { fn into_tool_route(self) -> ToolRoute { self @@ -220,7 +219,7 @@ where pub struct ToolAttrGenerateFunctionAdapter; impl IntoToolRoute for F where - S: Send + Sync + 'static, + S: MaybeSend + 'static, F: Fn() -> ToolRoute, { fn into_tool_route(self) -> ToolRoute { @@ -230,14 +229,14 @@ where pub trait CallToolHandlerExt: Sized where - Self: CallToolHandler + Send + Sync + Clone + 'static, + Self: CallToolHandler + MaybeSend + Clone + 'static, { fn name(self, name: impl Into>) -> WithToolAttr; } impl CallToolHandlerExt for C where - C: CallToolHandler + Send + Sync + Clone + 'static, + C: CallToolHandler + MaybeSend + Clone + 'static, { fn name(self, name: impl Into>) -> WithToolAttr { WithToolAttr { @@ -254,7 +253,7 @@ where pub struct WithToolAttr where - C: CallToolHandler + Send + Sync + Clone + 'static, + C: CallToolHandler + MaybeSend + Clone + 'static, { pub attr: crate::model::Tool, pub call: C, @@ -263,8 +262,8 @@ where impl IntoToolRoute for WithToolAttr where - C: CallToolHandler + Send + Sync + Clone + 'static, - S: Send + Sync + 'static, + C: CallToolHandler + MaybeSend + Clone + 'static, + S: MaybeSend + 'static, { fn into_tool_route(self) -> ToolRoute { ToolRoute::new(self.attr, self.call) @@ -273,7 +272,7 @@ where impl WithToolAttr where - C: CallToolHandler + Send + Sync + Clone + 'static, + C: CallToolHandler + MaybeSend + Clone + 'static, { pub fn description(mut self, description: impl Into>) -> Self { self.attr.description = Some(description.into()); @@ -328,7 +327,7 @@ impl IntoIterator for ToolRouter { impl ToolRouter where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { pub fn new() -> Self { Self { @@ -428,7 +427,7 @@ where impl std::ops::Add> for ToolRouter where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { type Output = Self; @@ -440,7 +439,7 @@ where impl std::ops::AddAssign> for ToolRouter where - S: Send + Sync + 'static, + S: MaybeSend + 'static, { fn add_assign(&mut self, other: ToolRouter) { self.merge(other); diff --git a/crates/rmcp/src/handler/server/router/tool/tool_traits.rs b/crates/rmcp/src/handler/server/router/tool/tool_traits.rs index 60ac9cff0..e4167a08b 100644 --- a/crates/rmcp/src/handler/server/router/tool/tool_traits.rs +++ b/crates/rmcp/src/handler/server/router/tool/tool_traits.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, pin::Pin, sync::Arc}; +use std::{borrow::Cow, future::Future, sync::Arc}; use serde::{Deserialize, Serialize}; @@ -11,6 +11,7 @@ use crate::{ }, model::{Icon, JsonObject, Meta, ToolAnnotations, ToolExecution}, schemars::JsonSchema, + service::{MaybeSend, MaybeSendFuture}, }; /// Base trait to define attributes of a tool. @@ -84,7 +85,8 @@ pub trait ToolBase { /// /// Consider using [`AsyncTool`] if your workflow involves asynchronous operations. /// Examples are shown in [the module-level documentation][crate::handler::server::router::tool]. -pub trait SyncTool: ToolBase { +#[allow(private_bounds)] +pub trait SyncTool: ToolBase { fn invoke(service: &S, param: Self::Parameter) -> Result; } @@ -92,11 +94,12 @@ pub trait SyncTool: ToolBase { /// /// Consider using [`SyncTool`] if your workflow does not involve asynchronous operations. /// Examples are shown in [the module-level documentation][crate::handler::server::router::tool]. -pub trait AsyncTool: ToolBase { +#[allow(private_bounds)] +pub trait AsyncTool: ToolBase { fn invoke( service: &S, param: Self::Parameter, - ) -> impl Future> + Send; + ) -> impl Future> + MaybeSendFuture; } pub(crate) fn tool_attribute() -> crate::model::Tool { @@ -113,14 +116,14 @@ pub(crate) fn tool_attribute() -> crate::model::Tool { } } -pub(crate) fn sync_tool_wrapper>( +pub(crate) fn sync_tool_wrapper>( service: &S, Parameters(params): Parameters, ) -> Result, ErrorData> { T::invoke(service, params).map(Json).map_err(Into::into) } -pub(crate) fn sync_tool_wrapper_with_empty_params>( +pub(crate) fn sync_tool_wrapper_with_empty_params>( service: &S, ) -> Result, ErrorData> { T::invoke(service, T::Parameter::default()) @@ -128,11 +131,10 @@ pub(crate) fn sync_tool_wrapper_with_empty_params>( +pub(crate) fn async_tool_wrapper>( service: &S, Parameters(params): Parameters, -) -> Pin, ErrorData>> + Send + '_>> { +) -> crate::service::MaybeBoxFuture<'_, Result, ErrorData>> { Box::pin(async move { T::invoke(service, params) .await @@ -141,10 +143,9 @@ pub(crate) fn async_tool_wrapper>( }) } -#[expect(clippy::type_complexity)] -pub(crate) fn async_tool_wrapper_with_empty_params>( +pub(crate) fn async_tool_wrapper_with_empty_params>( service: &S, -) -> Pin, ErrorData>> + Send + '_>> { +) -> crate::service::MaybeBoxFuture<'_, Result, ErrorData>> { Box::pin(async move { T::invoke(service, T::Parameter::default()) .await diff --git a/crates/rmcp/src/handler/server/tool.rs b/crates/rmcp/src/handler/server/tool.rs index c98aef0d5..5c72461b4 100644 --- a/crates/rmcp/src/handler/server/tool.rs +++ b/crates/rmcp/src/handler/server/tool.rs @@ -4,7 +4,7 @@ use std::{ marker::PhantomData, }; -use futures::future::{BoxFuture, FutureExt}; +use futures::future::BoxFuture; use serde::de::DeserializeOwned; use super::common::{AsRequestContext, FromContextPart}; @@ -16,7 +16,7 @@ use crate::{ RoleServer, handler::server::wrapper::Parameters, model::{CallToolRequestParams, CallToolResult, IntoContents, JsonObject}, - service::RequestContext, + service::{MaybeBoxFuture, MaybeSend, MaybeSendFuture, RequestContext}, }; /// Deserialize a JSON object into a type @@ -146,13 +146,21 @@ pub trait CallToolHandler { fn call( self, context: ToolCallContext<'_, S>, - ) -> BoxFuture<'_, Result>; + ) -> MaybeBoxFuture<'_, Result>; } +#[cfg(not(feature = "local"))] pub type DynCallToolHandler = dyn for<'s> Fn(ToolCallContext<'s, S>) -> BoxFuture<'s, Result> + Send + Sync; +#[cfg(feature = "local")] +pub type DynCallToolHandler = + dyn for<'s> Fn( + ToolCallContext<'s, S>, + ) + -> futures::future::LocalBoxFuture<'s, Result>; + // Tool-specific extractor for tool name pub struct ToolName(pub Cow<'static, str>); @@ -189,7 +197,7 @@ impl FromContextPart> for JsonObject { } impl<'s, S> ToolCallContext<'s, S> { - pub fn invoke(self, h: H) -> BoxFuture<'s, Result> + pub fn invoke(self, h: H) -> MaybeBoxFuture<'s, Result> where H: CallToolHandler, { @@ -221,31 +229,31 @@ macro_rules! impl_for { $( $Tn: for<'a> FromContextPart> , )* - F: FnOnce(&S, $($Tn,)*) -> BoxFuture<'_, R>, + F: FnOnce(&S, $($Tn,)*) -> MaybeBoxFuture<'_, R>, // Need RTN support here(I guess), https://github.com/rust-lang/rust/pull/138424 // Fut: Future + Send + 'a, - R: IntoCallToolResult + Send + 'static, - S: Send + Sync + 'static, + R: IntoCallToolResult + MaybeSendFuture + 'static, + S: MaybeSend + 'static, { #[allow(unused_variables, non_snake_case, unused_mut)] fn call( self, mut context: ToolCallContext<'_, S>, - ) -> BoxFuture<'_, Result>{ + ) -> MaybeBoxFuture<'_, Result>{ $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* let service = context.service; let fut = self(service, $($Tn,)*); - async move { + Box::pin(async move { let result = fut.await; result.into_call_tool_result() - }.boxed() + }) } } @@ -254,28 +262,28 @@ macro_rules! impl_for { $( $Tn: for<'a> FromContextPart> , )* - F: FnOnce($($Tn,)*) -> Fut + Send + , - Fut: Future + Send + 'static, - R: IntoCallToolResult + Send + 'static, - S: Send + Sync, + F: FnOnce($($Tn,)*) -> Fut + MaybeSendFuture, + Fut: Future + MaybeSendFuture + 'static, + R: IntoCallToolResult + MaybeSendFuture + 'static, + S: MaybeSend, { #[allow(unused_variables, non_snake_case, unused_mut)] fn call( self, mut context: ToolCallContext, - ) -> BoxFuture<'static, Result>{ + ) -> MaybeBoxFuture<'static, Result>{ $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* let fut = self($($Tn,)*); - async move { + Box::pin(async move { let result = fut.await; result.into_call_tool_result() - }.boxed() + }) } } @@ -284,23 +292,23 @@ macro_rules! impl_for { $( $Tn: for<'a> FromContextPart> + , )* - F: FnOnce(&S, $($Tn,)*) -> R + Send + , - R: IntoCallToolResult + Send + , - S: Send + Sync, + F: FnOnce(&S, $($Tn,)*) -> R + MaybeSendFuture, + R: IntoCallToolResult + MaybeSendFuture, + S: MaybeSend, { #[allow(unused_variables, non_snake_case, unused_mut)] fn call( self, mut context: ToolCallContext, - ) -> BoxFuture<'static, Result> { + ) -> MaybeBoxFuture<'static, Result> { $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* - std::future::ready(self(context.service, $($Tn,)*).into_call_tool_result()).boxed() + Box::pin(std::future::ready(self(context.service, $($Tn,)*).into_call_tool_result())) } } @@ -309,23 +317,23 @@ macro_rules! impl_for { $( $Tn: for<'a> FromContextPart> + , )* - F: FnOnce($($Tn,)*) -> R + Send + , - R: IntoCallToolResult + Send + , - S: Send + Sync, + F: FnOnce($($Tn,)*) -> R + MaybeSendFuture, + R: IntoCallToolResult + MaybeSendFuture, + S: MaybeSend, { #[allow(unused_variables, non_snake_case, unused_mut)] fn call( self, mut context: ToolCallContext, - ) -> BoxFuture<'static, Result> { + ) -> MaybeBoxFuture<'static, Result> { $( let result = $Tn::from_context_part(&mut context); let $Tn = match result { Ok(value) => value, - Err(e) => return std::future::ready(Err(e)).boxed(), + Err(e) => return Box::pin(std::future::ready(Err(e))), }; )* - std::future::ready(self($($Tn,)*).into_call_tool_result()).boxed() + Box::pin(std::future::ready(self($($Tn,)*).into_call_tool_result())) } } }; diff --git a/crates/rmcp/src/service.rs b/crates/rmcp/src/service.rs index d6613dd3c..0d3a6e5cc 100644 --- a/crates/rmcp/src/service.rs +++ b/crates/rmcp/src/service.rs @@ -1,6 +1,45 @@ +#[cfg(feature = "local")] +use futures::future::LocalBoxFuture; use futures::{FutureExt, future::BoxFuture}; use thiserror::Error; +// --------------------------------------------------------------------------- +// Conditional Send helpers +// +// `MaybeSend` – supertrait alias: `Send + Sync` without `local`, empty with `local` +// `MaybeSendFuture` – future bound alias: `Send` without `local`, empty with `local` +// `MaybeBoxFuture` – boxed future type: `BoxFuture` without `local`, `LocalBoxFuture` with `local` +// --------------------------------------------------------------------------- + +#[cfg(not(feature = "local"))] +#[doc(hidden)] +pub trait MaybeSend: Send + Sync {} +#[cfg(not(feature = "local"))] +impl MaybeSend for T {} + +#[cfg(feature = "local")] +#[doc(hidden)] +pub trait MaybeSend {} +#[cfg(feature = "local")] +impl MaybeSend for T {} + +#[cfg(not(feature = "local"))] +#[doc(hidden)] +pub trait MaybeSendFuture: Send {} +#[cfg(not(feature = "local"))] +impl MaybeSendFuture for T {} + +#[cfg(feature = "local")] +#[doc(hidden)] +pub trait MaybeSendFuture {} +#[cfg(feature = "local")] +impl MaybeSendFuture for T {} + +#[cfg(not(feature = "local"))] +pub(crate) type MaybeBoxFuture<'a, T> = BoxFuture<'a, T>; +#[cfg(feature = "local")] +pub(crate) type MaybeBoxFuture<'a, T> = LocalBoxFuture<'a, T>; + #[cfg(feature = "server")] use crate::model::ServerJsonRpcMessage; use crate::{ @@ -87,17 +126,21 @@ pub type RxJsonRpcMessage = JsonRpcMessage< ::PeerNot, >; -pub trait Service: Send + Sync + 'static { +#[allow( + private_bounds, + reason = "MaybeSend is a sealed conditional Send + Sync alias" +)] +pub trait Service: MaybeSend + 'static { fn handle_request( &self, request: R::PeerReq, context: RequestContext, - ) -> impl Future> + Send + '_; + ) -> impl Future> + MaybeSendFuture + '_; fn handle_notification( &self, notification: R::PeerNot, context: NotificationContext, - ) -> impl Future> + Send + '_; + ) -> impl Future> + MaybeSendFuture + '_; fn get_info(&self) -> R::Info; } @@ -111,7 +154,7 @@ pub trait ServiceExt: Service + Sized { fn serve( self, transport: T, - ) -> impl Future, R::InitializeError>> + Send + ) -> impl Future, R::InitializeError>> + MaybeSendFuture where T: IntoTransport, E: std::error::Error + Send + Sync + 'static, @@ -123,7 +166,7 @@ pub trait ServiceExt: Service + Sized { self, transport: T, ct: CancellationToken, - ) -> impl Future, R::InitializeError>> + Send + ) -> impl Future, R::InitializeError>> + MaybeSendFuture where T: IntoTransport, E: std::error::Error + Send + Sync + 'static, @@ -135,7 +178,7 @@ impl Service for Box> { &self, request: R::PeerReq, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { DynService::handle_request(self.as_ref(), request, context) } @@ -143,7 +186,7 @@ impl Service for Box> { &self, notification: R::PeerNot, context: NotificationContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { DynService::handle_notification(self.as_ref(), notification, context) } @@ -152,17 +195,21 @@ impl Service for Box> { } } -pub trait DynService: Send + Sync { +#[allow( + private_bounds, + reason = "MaybeSend is a sealed conditional Send + Sync alias" +)] +pub trait DynService: MaybeSend { fn handle_request( &self, request: R::PeerReq, context: RequestContext, - ) -> BoxFuture<'_, Result>; + ) -> MaybeBoxFuture<'_, Result>; fn handle_notification( &self, notification: R::PeerNot, context: NotificationContext, - ) -> BoxFuture<'_, Result<(), McpError>>; + ) -> MaybeBoxFuture<'_, Result<(), McpError>>; fn get_info(&self) -> R::Info; } @@ -171,14 +218,14 @@ impl> DynService for S { &self, request: R::PeerReq, context: RequestContext, - ) -> BoxFuture<'_, Result> { + ) -> MaybeBoxFuture<'_, Result> { Box::pin(self.handle_request(request, context)) } fn handle_notification( &self, notification: R::PeerNot, context: NotificationContext, - ) -> BoxFuture<'_, Result<(), McpError>> { + ) -> MaybeBoxFuture<'_, Result<(), McpError>> { Box::pin(self.handle_notification(notification, context)) } fn get_info(&self) -> R::Info { @@ -639,6 +686,28 @@ where serve_inner(service, transport.into_transport(), peer, peer_rx, ct) } +/// Spawn a task that may hold `!Send` state when the `local` feature is active. +/// +/// Without the `local` feature this is `tokio::spawn` (requires `Future: Send + 'static`). +/// With `local` it uses `tokio::task::spawn_local` (requires only `Future: 'static`). +#[cfg(not(feature = "local"))] +fn spawn_service_task(future: F) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + tokio::spawn(future) +} + +#[cfg(feature = "local")] +fn spawn_service_task(future: F) -> tokio::task::JoinHandle +where + F: Future + 'static, + F::Output: 'static, +{ + tokio::task::spawn_local(future) +} + #[instrument(skip_all)] fn serve_inner( service: S, @@ -674,7 +743,7 @@ where let serve_loop_ct = ct.child_token(); let peer_return: Peer = peer.clone(); let current_span = tracing::Span::current(); - let handle = tokio::spawn(async move { + let handle = spawn_service_task(async move { let mut transport = transport.into_transport(); let mut batch_messages = VecDeque::>::new(); let mut send_task_set = tokio::task::JoinSet::::new(); @@ -860,7 +929,7 @@ where extensions, }; let current_span = tracing::Span::current(); - tokio::spawn(async move { + spawn_service_task(async move { let result = service .handle_request(request, context) .await; @@ -907,7 +976,7 @@ where extensions, }; let current_span = tracing::Span::current(); - tokio::spawn(async move { + spawn_service_task(async move { let result = service.handle_notification(notification, context).await; if let Err(error) = result { tracing::warn!(%error, "Error sending notification"); diff --git a/crates/rmcp/src/service/client.rs b/crates/rmcp/src/service/client.rs index 6528e4144..8b49606e4 100644 --- a/crates/rmcp/src/service/client.rs +++ b/crates/rmcp/src/service/client.rs @@ -162,7 +162,8 @@ impl> ServiceExt for S { self, transport: T, ct: CancellationToken, - ) -> impl Future, ClientInitializeError>> + Send + ) -> impl Future, ClientInitializeError>> + + MaybeSendFuture where T: IntoTransport, E: std::error::Error + Send + Sync + 'static, diff --git a/crates/rmcp/src/service/server.rs b/crates/rmcp/src/service/server.rs index d011c225a..bd4f25bf5 100644 --- a/crates/rmcp/src/service/server.rs +++ b/crates/rmcp/src/service/server.rs @@ -95,7 +95,8 @@ impl> ServiceExt for S { self, transport: T, ct: CancellationToken, - ) -> impl Future, ServerInitializeError>> + Send + ) -> impl Future, ServerInitializeError>> + + MaybeSendFuture where T: IntoTransport, E: std::error::Error + Send + Sync + 'static, diff --git a/crates/rmcp/tests/test_client_initialization.rs b/crates/rmcp/tests/test_client_initialization.rs index c9b8f94a2..4a91f3ac3 100644 --- a/crates/rmcp/tests/test_client_initialization.rs +++ b/crates/rmcp/tests/test_client_initialization.rs @@ -1,5 +1,5 @@ // cargo test --features "server client" --package rmcp test_client_initialization -#![cfg(feature = "client")] +#![cfg(all(feature = "client", not(feature = "local")))] mod common; diff --git a/crates/rmcp/tests/test_close_connection.rs b/crates/rmcp/tests/test_close_connection.rs index b3bb5b638..50479a237 100644 --- a/crates/rmcp/tests/test_close_connection.rs +++ b/crates/rmcp/tests/test_close_connection.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] //cargo test --test test_close_connection --features "client server" mod common; diff --git a/crates/rmcp/tests/test_custom_headers.rs b/crates/rmcp/tests/test_custom_headers.rs index b83c85772..7d4316d3e 100644 --- a/crates/rmcp/tests/test_custom_headers.rs +++ b/crates/rmcp/tests/test_custom_headers.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::collections::HashMap; use http::{HeaderName, HeaderValue}; diff --git a/crates/rmcp/tests/test_custom_request.rs b/crates/rmcp/tests/test_custom_request.rs index 83a8d347f..66ee1ff99 100644 --- a/crates/rmcp/tests/test_custom_request.rs +++ b/crates/rmcp/tests/test_custom_request.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::sync::Arc; use rmcp::{ diff --git a/crates/rmcp/tests/test_logging.rs b/crates/rmcp/tests/test_logging.rs index 11efd84c9..c27cafbc5 100644 --- a/crates/rmcp/tests/test_logging.rs +++ b/crates/rmcp/tests/test_logging.rs @@ -1,4 +1,5 @@ // cargo test --features "server client" --package rmcp test_logging +#![cfg(not(feature = "local"))] mod common; use std::sync::{Arc, Mutex}; diff --git a/crates/rmcp/tests/test_message_protocol.rs b/crates/rmcp/tests/test_message_protocol.rs index 073486ff9..898040dbd 100644 --- a/crates/rmcp/tests/test_message_protocol.rs +++ b/crates/rmcp/tests/test_message_protocol.rs @@ -1,4 +1,5 @@ //cargo test --test test_message_protocol --features "client server" +#![cfg(not(feature = "local"))] mod common; use common::handlers::{TestClientHandler, TestServer}; diff --git a/crates/rmcp/tests/test_notification.rs b/crates/rmcp/tests/test_notification.rs index 7d930678e..662c4bd58 100644 --- a/crates/rmcp/tests/test_notification.rs +++ b/crates/rmcp/tests/test_notification.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::sync::Arc; use rmcp::{ diff --git a/crates/rmcp/tests/test_progress_subscriber.rs b/crates/rmcp/tests/test_progress_subscriber.rs index 092f35747..7bed457f4 100644 --- a/crates/rmcp/tests/test_progress_subscriber.rs +++ b/crates/rmcp/tests/test_progress_subscriber.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use futures::StreamExt; use rmcp::{ ClientHandler, Peer, RoleServer, ServerHandler, ServiceExt, diff --git a/crates/rmcp/tests/test_prompt_macros.rs b/crates/rmcp/tests/test_prompt_macros.rs index a41d2e7e5..b7c0c442f 100644 --- a/crates/rmcp/tests/test_prompt_macros.rs +++ b/crates/rmcp/tests/test_prompt_macros.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] //cargo test --test test_prompt_macros --features "client server" #![allow(dead_code)] use std::sync::Arc; diff --git a/crates/rmcp/tests/test_prompt_routers.rs b/crates/rmcp/tests/test_prompt_routers.rs index 53b13b131..23674bd96 100644 --- a/crates/rmcp/tests/test_prompt_routers.rs +++ b/crates/rmcp/tests/test_prompt_routers.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::collections::HashMap; use futures::future::BoxFuture; diff --git a/crates/rmcp/tests/test_sampling.rs b/crates/rmcp/tests/test_sampling.rs index 02da06cf7..62bba1123 100644 --- a/crates/rmcp/tests/test_sampling.rs +++ b/crates/rmcp/tests/test_sampling.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] mod common; use anyhow::Result; diff --git a/crates/rmcp/tests/test_server_initialization.rs b/crates/rmcp/tests/test_server_initialization.rs index c07501f0b..505c41aad 100644 --- a/crates/rmcp/tests/test_server_initialization.rs +++ b/crates/rmcp/tests/test_server_initialization.rs @@ -1,5 +1,5 @@ // cargo test --features "client" --package rmcp -- server_init -#![cfg(feature = "client")] +#![cfg(all(feature = "client", not(feature = "local")))] mod common; use common::handlers::TestServer; diff --git a/crates/rmcp/tests/test_task_support_validation.rs b/crates/rmcp/tests/test_task_support_validation.rs index 88d2ed519..c0a65a9e0 100644 --- a/crates/rmcp/tests/test_task_support_validation.rs +++ b/crates/rmcp/tests/test_task_support_validation.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] //! Tests for task support validation in tool calls. //! //! Verifies that the server correctly validates `execution.taskSupport` settings diff --git a/crates/rmcp/tests/test_tool_macros.rs b/crates/rmcp/tests/test_tool_macros.rs index bd06ca6ea..450a00033 100644 --- a/crates/rmcp/tests/test_tool_macros.rs +++ b/crates/rmcp/tests/test_tool_macros.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] //! Test tool macros, including documentation for generated fns. //cargo test --test test_tool_macros --features "client server" diff --git a/crates/rmcp/tests/test_tool_routers.rs b/crates/rmcp/tests/test_tool_routers.rs index 987d1a0b1..c10665064 100644 --- a/crates/rmcp/tests/test_tool_routers.rs +++ b/crates/rmcp/tests/test_tool_routers.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::collections::HashMap; use futures::future::BoxFuture; diff --git a/crates/rmcp/tests/test_with_js.rs b/crates/rmcp/tests/test_with_js.rs index c1e5d81a6..685ea1430 100644 --- a/crates/rmcp/tests/test_with_js.rs +++ b/crates/rmcp/tests/test_with_js.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use rmcp::{ ServiceExt, service::QuitReason, diff --git a/crates/rmcp/tests/test_with_python.rs b/crates/rmcp/tests/test_with_python.rs index 3f883c96f..c905e1b5b 100644 --- a/crates/rmcp/tests/test_with_python.rs +++ b/crates/rmcp/tests/test_with_python.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::process::Stdio; use rmcp::{ From 8ef36fd2d2aa048c2db93e5b64b08075fb262dd0 Mon Sep 17 00:00:00 2001 From: Dale Seo <5466341+DaleSeo@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:24:13 -0400 Subject: [PATCH 2/2] fix: gate streamable HTTP transport on not(local) feature --- crates/rmcp/README.md | 2 +- crates/rmcp/src/handler/client.rs | 59 ++++++++++--------- crates/rmcp/src/handler/server/prompt.rs | 2 +- crates/rmcp/src/handler/server/tool.rs | 1 + crates/rmcp/src/service.rs | 4 +- crates/rmcp/src/service/tower.rs | 4 +- crates/rmcp/src/transport.rs | 4 +- .../src/transport/streamable_http_server.rs | 4 +- .../streamable_http_server/session.rs | 2 +- .../transport/streamable_http_server/tower.rs | 2 +- crates/rmcp/tests/common/handlers.rs | 10 +++- .../rmcp/tests/test_sse_concurrent_streams.rs | 1 + .../test_streamable_http_json_response.rs | 1 + .../tests/test_streamable_http_priming.rs | 1 + .../test_streamable_http_stale_session.rs | 3 +- 15 files changed, 57 insertions(+), 43 deletions(-) diff --git a/crates/rmcp/README.md b/crates/rmcp/README.md index 24deade15..c133e40e8 100644 --- a/crates/rmcp/README.md +++ b/crates/rmcp/README.md @@ -52,7 +52,7 @@ The transport layer is pluggable. Two built-in pairs cover the most common cases | | Client | Server | |:-:|:-:|:-:| | **stdio** | [`TokioChildProcess`](crate::transport::TokioChildProcess) | [`stdio`](crate::transport::stdio) | -| **Streamable HTTP** | [`StreamableHttpClientTransport`](crate::transport::StreamableHttpClientTransport) | [`StreamableHttpService`](crate::transport::StreamableHttpService) | +| **Streamable HTTP** | [`StreamableHttpClientTransport`](crate::transport::StreamableHttpClientTransport) | `StreamableHttpService` | Any type that implements the [`Transport`](crate::transport::Transport) trait can be used. The [`IntoTransport`](crate::transport::IntoTransport) helper trait provides automatic conversions from: diff --git a/crates/rmcp/src/handler/client.rs b/crates/rmcp/src/handler/client.rs index eeb79309e..1b9c1e38e 100644 --- a/crates/rmcp/src/handler/client.rs +++ b/crates/rmcp/src/handler/client.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use crate::{ error::ErrorData as McpError, model::*, - service::{NotificationContext, RequestContext, RoleClient, Service, ServiceRole}, + service::{ + MaybeSendFuture, NotificationContext, RequestContext, RoleClient, Service, ServiceRole, + }, }; impl Service for H { @@ -83,7 +85,7 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { fn ping( &self, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(())) } @@ -91,7 +93,7 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { &self, params: CreateMessageRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Err( McpError::method_not_found::(), )) @@ -100,7 +102,7 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { fn list_roots( &self, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(ListRootsResult::default())) } @@ -162,7 +164,8 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { &self, request: CreateElicitationRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ + { // Default implementation declines all requests - real clients should override this let _ = (request, context); std::future::ready(Ok(CreateElicitationResult { @@ -175,7 +178,7 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { &self, request: CustomRequest, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { let CustomRequest { method, .. } = request; let _ = context; std::future::ready(Err(McpError::new( @@ -189,46 +192,46 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { &self, params: CancelledNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_progress( &self, params: ProgressNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_logging_message( &self, params: LoggingMessageNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_resource_updated( &self, params: ResourceUpdatedNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_resource_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_tool_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_prompt_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } @@ -236,14 +239,14 @@ pub trait ClientHandler: Sized + Send + Sync + 'static { &self, params: ElicitationResponseNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { std::future::ready(()) } fn on_custom_notification( &self, notification: CustomNotification, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { let _ = (notification, context); std::future::ready(()) } @@ -269,7 +272,7 @@ macro_rules! impl_client_handler_for_wrapper { fn ping( &self, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).ping(context) } @@ -277,14 +280,14 @@ macro_rules! impl_client_handler_for_wrapper { &self, params: CreateMessageRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).create_message(params, context) } fn list_roots( &self, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).list_roots(context) } @@ -292,7 +295,7 @@ macro_rules! impl_client_handler_for_wrapper { &self, request: CreateElicitationRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).create_elicitation(request, context) } @@ -300,7 +303,7 @@ macro_rules! impl_client_handler_for_wrapper { &self, request: CustomRequest, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { (**self).on_custom_request(request, context) } @@ -308,7 +311,7 @@ macro_rules! impl_client_handler_for_wrapper { &self, params: CancelledNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_cancelled(params, context) } @@ -316,7 +319,7 @@ macro_rules! impl_client_handler_for_wrapper { &self, params: ProgressNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_progress(params, context) } @@ -324,7 +327,7 @@ macro_rules! impl_client_handler_for_wrapper { &self, params: LoggingMessageNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_logging_message(params, context) } @@ -332,28 +335,28 @@ macro_rules! impl_client_handler_for_wrapper { &self, params: ResourceUpdatedNotificationParam, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_resource_updated(params, context) } fn on_resource_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_resource_list_changed(context) } fn on_tool_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_tool_list_changed(context) } fn on_prompt_list_changed( &self, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_prompt_list_changed(context) } @@ -361,7 +364,7 @@ macro_rules! impl_client_handler_for_wrapper { &self, notification: CustomNotification, context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { (**self).on_custom_notification(notification, context) } diff --git a/crates/rmcp/src/handler/server/prompt.rs b/crates/rmcp/src/handler/server/prompt.rs index ae98677c0..27a03e835 100644 --- a/crates/rmcp/src/handler/server/prompt.rs +++ b/crates/rmcp/src/handler/server/prompt.rs @@ -6,8 +6,8 @@ use std::{future::Future, marker::PhantomData}; +#[cfg(not(feature = "local"))] use futures::future::BoxFuture; -#[allow(unused_imports)] use serde::de::DeserializeOwned; use super::common::{AsRequestContext, FromContextPart}; diff --git a/crates/rmcp/src/handler/server/tool.rs b/crates/rmcp/src/handler/server/tool.rs index 5c72461b4..0ad8ce61a 100644 --- a/crates/rmcp/src/handler/server/tool.rs +++ b/crates/rmcp/src/handler/server/tool.rs @@ -4,6 +4,7 @@ use std::{ marker::PhantomData, }; +#[cfg(not(feature = "local"))] use futures::future::BoxFuture; use serde::de::DeserializeOwned; diff --git a/crates/rmcp/src/service.rs b/crates/rmcp/src/service.rs index 0d3a6e5cc..be9b461ab 100644 --- a/crates/rmcp/src/service.rs +++ b/crates/rmcp/src/service.rs @@ -1,6 +1,8 @@ +use futures::FutureExt; +#[cfg(not(feature = "local"))] +use futures::future::BoxFuture; #[cfg(feature = "local")] use futures::future::LocalBoxFuture; -use futures::{FutureExt, future::BoxFuture}; use thiserror::Error; // --------------------------------------------------------------------------- diff --git a/crates/rmcp/src/service/tower.rs b/crates/rmcp/src/service/tower.rs index ac4a66f00..867d3e7fa 100644 --- a/crates/rmcp/src/service/tower.rs +++ b/crates/rmcp/src/service/tower.rs @@ -3,7 +3,7 @@ use std::{future::poll_fn, marker::PhantomData}; use tower_service::Service as TowerService; use super::NotificationContext; -use crate::service::{RequestContext, Service, ServiceRole}; +use crate::service::{MaybeSendFuture, RequestContext, Service, ServiceRole}; pub struct TowerHandler { pub service: S, @@ -44,7 +44,7 @@ where &self, _notification: R::PeerNot, _context: NotificationContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { std::future::ready(Ok(())) } diff --git a/crates/rmcp/src/transport.rs b/crates/rmcp/src/transport.rs index 683f6880f..8a90542d8 100644 --- a/crates/rmcp/src/transport.rs +++ b/crates/rmcp/src/transport.rs @@ -7,7 +7,7 @@ //! | transport | client | server | //! |:-: |:-: |:-: | //! | std IO | [`child_process::TokioChildProcess`] | [`io::stdio`] | -//! | streamable http | [`streamable_http_client::StreamableHttpClientTransport`] | [`streamable_http_server::StreamableHttpService`] | +//! | streamable http | [`streamable_http_client::StreamableHttpClientTransport`] | `streamable_http_server::StreamableHttpService` | //! //!## Helper Transport Types //! Thers are several helper transport types that can help you to create transport quickly. @@ -107,7 +107,7 @@ pub use auth::{ // pub mod ws; #[cfg(feature = "transport-streamable-http-server-session")] pub mod streamable_http_server; -#[cfg(feature = "transport-streamable-http-server")] +#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))] pub use streamable_http_server::tower::{StreamableHttpServerConfig, StreamableHttpService}; #[cfg(feature = "transport-streamable-http-client")] diff --git a/crates/rmcp/src/transport/streamable_http_server.rs b/crates/rmcp/src/transport/streamable_http_server.rs index b991ff2e2..9cbb63cc0 100644 --- a/crates/rmcp/src/transport/streamable_http_server.rs +++ b/crates/rmcp/src/transport/streamable_http_server.rs @@ -1,6 +1,6 @@ pub mod session; -#[cfg(feature = "transport-streamable-http-server")] +#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))] pub mod tower; pub use session::{SessionId, SessionManager}; -#[cfg(feature = "transport-streamable-http-server")] +#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))] pub use tower::{StreamableHttpServerConfig, StreamableHttpService}; diff --git a/crates/rmcp/src/transport/streamable_http_server/session.rs b/crates/rmcp/src/transport/streamable_http_server/session.rs index 9cf4d0dbc..dcdb25c86 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session.rs @@ -33,7 +33,7 @@ pub mod never; /// Controls how MCP sessions are created, validated, and closed. /// -/// The [`StreamableHttpService`](super::StreamableHttpService) calls into this +/// The `StreamableHttpService` calls into this /// trait for every HTTP request that carries (or should carry) a session ID. /// /// See the [module-level docs](self) for background on sessions. diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 74b1fd79e..0e6f0789e 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -204,7 +204,7 @@ impl Clone for StreamableHttpService { impl tower_service::Service> for StreamableHttpService where RequestBody: Body + Send + 'static, - S: crate::Service, + S: crate::Service + Send + 'static, M: SessionManager, RequestBody::Error: Display, RequestBody::Data: Send + 'static, diff --git a/crates/rmcp/tests/common/handlers.rs b/crates/rmcp/tests/common/handlers.rs index 811bd824b..866cbdeff 100644 --- a/crates/rmcp/tests/common/handlers.rs +++ b/crates/rmcp/tests/common/handlers.rs @@ -7,7 +7,11 @@ use std::{ use rmcp::service::NotificationContext; #[cfg(feature = "client")] use rmcp::{ClientHandler, RoleClient}; -use rmcp::{ErrorData as McpError, RoleServer, ServerHandler, model::*, service::RequestContext}; +use rmcp::{ + ErrorData as McpError, RoleServer, ServerHandler, + model::*, + service::{MaybeSendFuture, RequestContext}, +}; #[cfg(feature = "client")] use serde_json::json; use tokio::sync::Notify; @@ -85,7 +89,7 @@ impl ClientHandler for TestClientHandler { &self, params: LoggingMessageNotificationParam, _context: NotificationContext, - ) -> impl Future + Send + '_ { + ) -> impl Future + MaybeSendFuture + '_ { let receive_signal = self.receive_signal.clone(); let received_messages = self.received_messages.clone(); @@ -116,7 +120,7 @@ impl ServerHandler for TestServer { &self, request: SetLevelRequestParams, context: RequestContext, - ) -> impl Future> + Send + '_ { + ) -> impl Future> + MaybeSendFuture + '_ { let peer = context.peer; async move { let (data, logger) = match request.level { diff --git a/crates/rmcp/tests/test_sse_concurrent_streams.rs b/crates/rmcp/tests/test_sse_concurrent_streams.rs index 33625a741..a7821fe9d 100644 --- a/crates/rmcp/tests/test_sse_concurrent_streams.rs +++ b/crates/rmcp/tests/test_sse_concurrent_streams.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] /// Tests for concurrent SSE stream handling (shadow channels) /// /// These tests verify that multiple GET SSE streams on the same session diff --git a/crates/rmcp/tests/test_streamable_http_json_response.rs b/crates/rmcp/tests/test_streamable_http_json_response.rs index e5b3323a9..b023acd06 100644 --- a/crates/rmcp/tests/test_streamable_http_json_response.rs +++ b/crates/rmcp/tests/test_streamable_http_json_response.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use rmcp::transport::streamable_http_server::{ StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager, }; diff --git a/crates/rmcp/tests/test_streamable_http_priming.rs b/crates/rmcp/tests/test_streamable_http_priming.rs index 778dfedff..5e771024c 100644 --- a/crates/rmcp/tests/test_streamable_http_priming.rs +++ b/crates/rmcp/tests/test_streamable_http_priming.rs @@ -1,3 +1,4 @@ +#![cfg(not(feature = "local"))] use std::time::Duration; use rmcp::transport::streamable_http_server::{ diff --git a/crates/rmcp/tests/test_streamable_http_stale_session.rs b/crates/rmcp/tests/test_streamable_http_stale_session.rs index a37a0895f..260c4fa7a 100644 --- a/crates/rmcp/tests/test_streamable_http_stale_session.rs +++ b/crates/rmcp/tests/test_streamable_http_stale_session.rs @@ -1,7 +1,8 @@ #![cfg(all( feature = "transport-streamable-http-client", feature = "transport-streamable-http-client-reqwest", - feature = "transport-streamable-http-server" + feature = "transport-streamable-http-server", + not(feature = "local") ))] use std::{collections::HashMap, sync::Arc};