From 75fa0523da78e722c5dc6b656982be4472d39a4a Mon Sep 17 00:00:00 2001 From: Dale Seo <5466341+DaleSeo@users.noreply.github.com> Date: Fri, 8 May 2026 13:03:17 -0400 Subject: [PATCH 1/2] feat: add task-based stdio examples --- README.md | 23 ++++ examples/clients/Cargo.toml | 4 + examples/clients/README.md | 12 +++ examples/clients/src/task_stdio.rs | 127 +++++++++++++++++++++++ examples/servers/Cargo.toml | 4 + examples/servers/README.md | 10 ++ examples/servers/src/common/mod.rs | 1 + examples/servers/src/common/task_demo.rs | 93 +++++++++++++++++ examples/servers/src/task_stdio.rs | 27 +++++ 9 files changed, 301 insertions(+) create mode 100644 examples/clients/src/task_stdio.rs create mode 100644 examples/servers/src/common/task_demo.rs create mode 100644 examples/servers/src/task_stdio.rs diff --git a/README.md b/README.md index f5f1bd3a1..d3edc3383 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ For the full MCP specification, see [modelcontextprotocol.io](https://modelconte - [Completions](#completions) - [Notifications](#notifications) - [Subscriptions](#subscriptions) +- [Tasks](#tasks-long-running-tool-invocations) - [Examples](#examples) - [OAuth Support](#oauth-support) - [Related Resources](#related-resources) @@ -954,6 +955,28 @@ impl ClientHandler for MyClient { --- +## Tasks (long-running tool invocations) + +`rmcp` supports the [task-based tool invocation](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks) +flow defined in SEP-1319. Annotate a tool with `execution(task_support = "required" | "optional")` +and add `#[task_handler]` to your `ServerHandler` impl — `enqueue_task`, `tasks/list`, `tasks/get`, +`tasks/result`, and `tasks/cancel` are generated for you on top of an `OperationProcessor`. + +```rust, ignore +#[tool( + description = "Sum two numbers after a 2-second delay", + execution(task_support = "required") +)] +async fn slow_sum(/* ... */) -> Result { /* ... */ } + +#[tool_handler] +#[task_handler] +impl ServerHandler for TaskDemo {} +``` + +See [`servers_task_stdio`](examples/servers/src/task_stdio.rs) and the matching +[`clients_task_stdio`](examples/clients/src/task_stdio.rs) for a runnable end-to-end example. + ## Examples See [examples](examples/README.md). diff --git a/examples/clients/Cargo.toml b/examples/clients/Cargo.toml index f2ac8dd7d..416057ac8 100644 --- a/examples/clients/Cargo.toml +++ b/examples/clients/Cargo.toml @@ -61,3 +61,7 @@ path = "src/progress_client.rs" [[example]] name = "clients_client_credentials" path = "src/auth/client_credentials.rs" + +[[example]] +name = "clients_task_stdio" +path = "src/task_stdio.rs" diff --git a/examples/clients/README.md b/examples/clients/README.md index e066cd77d..4361d681d 100644 --- a/examples/clients/README.md +++ b/examples/clients/README.md @@ -59,6 +59,15 @@ A client demonstrating how to use the sampling tool. - Retrieves server information and list of available tools - Calls the `ask_llm` tool +### Task Standard I/O Client (`task_stdio.rs`) + +A client that exercises the task lifecycle against `servers_task_stdio` +(per [SEP-1319](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)). + +- Spawns `servers_task_stdio` as a child process over stdio +- Calls `quick_echo` synchronously +- Calls `slow_sum` as a task via `CallToolRequestParams::with_task(...)`, polls `tasks/get` until completion, then fetches the result via `tasks/result` + ### Progress Test Client (`progress_client.rs`) A client that communicates with an MCP server using progress notifications. @@ -91,6 +100,9 @@ cargo run -p mcp-client-examples --example clients_oauth_client # Run the sampling standard I/O client example cargo run -p mcp-client-examples --example clients_sampling_stdio + +# Run the task-based invocation client (drives servers_task_stdio) +cargo run -p mcp-client-examples --example clients_task_stdio ``` ## Dependencies diff --git a/examples/clients/src/task_stdio.rs b/examples/clients/src/task_stdio.rs new file mode 100644 index 000000000..472a9c370 --- /dev/null +++ b/examples/clients/src/task_stdio.rs @@ -0,0 +1,127 @@ +//! Client for the task-demo server in `examples/servers/src/task_stdio.rs`. +//! +//! Walks through the task lifecycle (SEP-1319): +//! 1. Call a regular tool (`quick_echo`) — synchronous response. +//! 2. Call a task-required tool (`slow_sum`) by attaching `task: {}` to +//! the `tools/call` request. The server returns a `Task` with a `task_id`. +//! 3. Poll `tasks/get` until status becomes `Completed`. +//! 4. Fetch the underlying `CallToolResult` via `tasks/result`. + +use anyhow::{Result, anyhow}; +use rmcp::{ + ServiceExt, + model::{ + CallToolRequestParams, CallToolResult, ClientRequest, GetTaskInfoParams, + GetTaskResultParams, JsonObject, Request, ServerResult, TaskStatus, + }, + object, + transport::{ConfigureCommandExt, TokioChildProcess}, +}; +use tokio::process::Command; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| format!("info,{}=debug", env!("CARGO_CRATE_NAME")).into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Spawn the task-demo server as a child process over stdio. + let client = () + .serve(TokioChildProcess::new(Command::new("cargo").configure( + |cmd| { + cmd.arg("run") + .arg("-q") + .arg("-p") + .arg("mcp-server-examples") + .arg("--example") + .arg("servers_task_stdio"); + }, + ))?) + .await?; + + // 1) Synchronous call. `quick_echo` has the default task_support = forbidden. + let echo = client + .call_tool( + CallToolRequestParams::new("quick_echo") + .with_arguments(object!({ "message": "hi from rmcp" })), + ) + .await?; + tracing::info!("quick_echo -> {echo:#?}"); + + // 2) Task call. `slow_sum` is task_support = required, so we MUST attach a + // `task` object. An empty object is fine — clients can stash arbitrary + // metadata here that the server-side `OperationDescriptor` will keep. + let create = client + .send_request(ClientRequest::CallToolRequest(Request::new( + CallToolRequestParams::new("slow_sum") + .with_arguments(object!({ "a": 40, "b": 2 })) + .with_task(JsonObject::new()), + ))) + .await?; + let ServerResult::CreateTaskResult(create) = create else { + return Err(anyhow!("expected CreateTaskResult, got {create:?}")); + }; + let task_id = create.task.task_id.clone(); + tracing::info!( + "slow_sum enqueued as task {task_id} (status = {:?})", + create.task.status + ); + + // 3) Poll `tasks/get` until the server reports a terminal status. + let final_status = loop { + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + + let info = client + .send_request(ClientRequest::GetTaskInfoRequest(Request::new( + GetTaskInfoParams { + meta: None, + task_id: task_id.clone(), + }, + ))) + .await?; + let ServerResult::GetTaskResult(info) = info else { + return Err(anyhow!("expected GetTaskResult, got {info:?}")); + }; + tracing::info!("status = {:?}", info.task.status); + + match info.task.status { + TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => { + break info.task.status; + } + _ => {} + } + }; + + if final_status != TaskStatus::Completed { + return Err(anyhow!("task ended in {final_status:?}")); + } + + // 4) Fetch the payload. The server-side handler returns a serialized + // `CallToolResult`. On the wire the response is just a JSON value, and + // `ServerResult` is `#[serde(untagged)]`, so the client decodes it as + // whichever variant the JSON shape matches first — a `CallToolResult` + // here. (For a non-tool task the same value would surface as + // `ServerResult::CustomResult` and need manual `serde_json::from_value`.) + let payload = client + .send_request(ClientRequest::GetTaskResultRequest(Request::new( + GetTaskResultParams { + meta: None, + task_id: task_id.clone(), + }, + ))) + .await?; + let call_result: CallToolResult = match payload { + ServerResult::CallToolResult(r) => r, + ServerResult::CustomResult(c) => serde_json::from_value(c.0)?, + other => return Err(anyhow!("unexpected task result: {other:?}")), + }; + tracing::info!("slow_sum result -> {call_result:#?}"); + + client.cancel().await?; + Ok(()) +} diff --git a/examples/servers/Cargo.toml b/examples/servers/Cargo.toml index cea7ba6f1..7dbd2fb4d 100644 --- a/examples/servers/Cargo.toml +++ b/examples/servers/Cargo.toml @@ -109,3 +109,7 @@ path = "src/calculator_stdio.rs" [[example]] name = "elicitation_enum_select" path = "src/elicitation_enum_inference.rs" + +[[example]] +name = "servers_task_stdio" +path = "src/task_stdio.rs" diff --git a/examples/servers/README.md b/examples/servers/README.md index 946a2433f..69126519e 100644 --- a/examples/servers/README.md +++ b/examples/servers/README.md @@ -62,6 +62,16 @@ A server demonstrating the prompt framework capabilities. - Uses standard I/O transport - Good example of prompt implementation patterns +### Task Demo Server (`task_stdio.rs`) + +A minimal stdio server demonstrating task-based tool invocation per +[SEP-1319](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks). + +- `slow_sum` is declared with `execution(task_support = "required")`, so clients MUST invoke it as a task +- `quick_echo` is a regular synchronous tool for contrast +- Wires up `enqueue_task` / `tasks/get` / `tasks/result` / `tasks/cancel` via `#[task_handler]` +- Pair with `examples/clients/src/task_stdio.rs` to see the full lifecycle (create → poll → fetch result) + ### Progress Demo Server (`progress_demo.rs`) A server that demonstrates progress notifications during long-running operations. diff --git a/examples/servers/src/common/mod.rs b/examples/servers/src/common/mod.rs index 674a8b51a..32df83c78 100644 --- a/examples/servers/src/common/mod.rs +++ b/examples/servers/src/common/mod.rs @@ -2,3 +2,4 @@ pub mod calculator; pub mod counter; pub mod generic_service; pub mod progress_demo; +pub mod task_demo; diff --git a/examples/servers/src/common/task_demo.rs b/examples/servers/src/common/task_demo.rs new file mode 100644 index 000000000..27bff2195 --- /dev/null +++ b/examples/servers/src/common/task_demo.rs @@ -0,0 +1,93 @@ +//! Minimal example of a tool that supports task-based invocation (SEP-1319). +//! +//! - `slow_sum` is marked `task_support = "required"`, so the client MUST invoke +//! it as a task. The server enqueues the call into an `OperationProcessor`, +//! returns a task id immediately, and the client polls `tasks/get` and +//! fetches the payload via `tasks/result`. +//! - `quick_echo` is a regular synchronous tool for contrast (the default, +//! `task_support = "forbidden"`). +//! +//! See `examples/clients/src/task_stdio.rs` for the matching client. + +#![allow(dead_code)] + +use std::sync::Arc; + +use rmcp::{ + ErrorData as McpError, ServerHandler, + handler::server::{router::tool::ToolRouter, wrapper::Parameters}, + model::{CallToolResult, Content}, + schemars, task_handler, + task_manager::OperationProcessor, + tool, tool_handler, tool_router, +}; +use tokio::sync::Mutex; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct SumArgs { + pub a: i32, + pub b: i32, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct EchoArgs { + pub message: String, +} + +/// Server state. The `processor` field is required by `#[task_handler]`: +/// the macro generates `enqueue_task` / `tasks/*` handlers that submit and +/// poll operations through it. +#[derive(Clone)] +pub struct TaskDemo { + tool_router: ToolRouter, + processor: Arc>, +} + +impl Default for TaskDemo { + fn default() -> Self { + Self::new() + } +} + +#[tool_router] +impl TaskDemo { + pub fn new() -> Self { + Self { + tool_router: Self::tool_router(), + processor: Arc::new(Mutex::new(OperationProcessor::new())), + } + } + + /// Long-running tool. The `execution(task_support = "required")` attribute + /// tells clients they MUST call this tool as a task; the server returns + /// `-32601` if they don't. + #[tool( + description = "Sum two numbers after a 2-second delay", + execution(task_support = "required") + )] + async fn slow_sum( + &self, + Parameters(SumArgs { a, b }): Parameters, + ) -> Result { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + Ok(CallToolResult::success(vec![Content::text( + (a + b).to_string(), + )])) + } + + /// Synchronous tool with the default `task_support = "forbidden"`. + #[tool(description = "Echo a message back immediately")] + async fn quick_echo( + &self, + Parameters(EchoArgs { message }): Parameters, + ) -> Result { + Ok(CallToolResult::success(vec![Content::text(message)])) + } +} + +/// `#[task_handler]` reads `self.processor` (configurable via the macro's +/// `processor = ...` argument) and synthesizes `enqueue_task`, `list_tasks`, +/// `get_task_info`, `get_task_result`, and `cancel_task` for us. +#[tool_handler] +#[task_handler] +impl ServerHandler for TaskDemo {} diff --git a/examples/servers/src/task_stdio.rs b/examples/servers/src/task_stdio.rs new file mode 100644 index 000000000..82e293ed5 --- /dev/null +++ b/examples/servers/src/task_stdio.rs @@ -0,0 +1,27 @@ +use anyhow::Result; +use common::task_demo::TaskDemo; +use rmcp::{ServiceExt, transport::stdio}; +use tracing_subscriber::{self, EnvFilter}; +mod common; + +/// Stdio server demonstrating task-based tool invocation. +/// +/// Run a matching client with: +/// cargo run -p mcp-client-examples --example clients_task_stdio +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())) + .with_writer(std::io::stderr) + .with_ansi(false) + .init(); + + tracing::info!("Starting task-demo MCP server"); + + let service = TaskDemo::new().serve(stdio()).await.inspect_err(|e| { + tracing::error!("serving error: {e:?}"); + })?; + + service.waiting().await?; + Ok(()) +} From 0e230f29eaf169d842cc1a4b2b92984ee73bb1ce Mon Sep 17 00:00:00 2001 From: Dale Seo <5466341+DaleSeo@users.noreply.github.com> Date: Fri, 8 May 2026 13:08:33 -0400 Subject: [PATCH 2/2] docs: add Tasks section to Chinese README --- docs/readme/README.zh-cn.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/readme/README.zh-cn.md b/docs/readme/README.zh-cn.md index 70f0e5278..8c3d60671 100644 --- a/docs/readme/README.zh-cn.md +++ b/docs/readme/README.zh-cn.md @@ -31,6 +31,7 @@ - [补全](#补全) - [通知](#通知) - [订阅](#订阅) +- [任务](#任务长时间运行的工具调用) - [示例](#示例) - [OAuth 支持](#oauth-支持) - [相关资源](#相关资源) @@ -954,6 +955,26 @@ impl ClientHandler for MyClient { --- +## 任务(长时间运行的工具调用) + +`rmcp` 支持 SEP-1319 中定义的[基于任务的工具调用](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)流程。为工具添加 `execution(task_support = "required" | "optional")` 注解,并在 `ServerHandler` 实现上添加 `#[task_handler]` —— `enqueue_task`、`tasks/list`、`tasks/get`、`tasks/result` 和 `tasks/cancel` 将在 `OperationProcessor` 之上自动生成。 + +```rust, ignore +#[tool( + description = "Sum two numbers after a 2-second delay", + execution(task_support = "required") +)] +async fn slow_sum(/* ... */) -> Result { /* ... */ } + +#[tool_handler] +#[task_handler] +impl ServerHandler for TaskDemo {} +``` + +完整的端到端示例请参阅 [`servers_task_stdio`](../../examples/servers/src/task_stdio.rs) 及对应的 [`clients_task_stdio`](../../examples/clients/src/task_stdio.rs)。 + +--- + ## 示例 查看 [examples](../../examples/README.md)。