Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ For the full MCP specification, see [modelcontextprotocol.io](https://modelconte
- [Completions](#completions)
- [Notifications](#notifications)
- [Subscriptions](#subscriptions)
- [Tasks](#tasks-long-running-tool-invocations)
- [Examples](#examples)
- [OAuth Support](#oauth-support)
- [Related Resources](#related-resources)
Expand Down Expand Up @@ -954,6 +955,28 @@ impl ClientHandler for MyClient {

---

## Tasks (long-running tool invocations)

`rmcp` supports the [task-based tool invocation](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)
flow defined in SEP-1319. Annotate a tool with `execution(task_support = "required" | "optional")`
and add `#[task_handler]` to your `ServerHandler` impl — `enqueue_task`, `tasks/list`, `tasks/get`,
`tasks/result`, and `tasks/cancel` are generated for you on top of an `OperationProcessor`.

```rust, ignore
#[tool(
description = "Sum two numbers after a 2-second delay",
execution(task_support = "required")
)]
async fn slow_sum(/* ... */) -> Result<CallToolResult, McpError> { /* ... */ }

#[tool_handler]
#[task_handler]
impl ServerHandler for TaskDemo {}
```

See [`servers_task_stdio`](examples/servers/src/task_stdio.rs) and the matching
[`clients_task_stdio`](examples/clients/src/task_stdio.rs) for a runnable end-to-end example.

## Examples

See [examples](examples/README.md).
Expand Down
21 changes: 21 additions & 0 deletions docs/readme/README.zh-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- [补全](#补全)
- [通知](#通知)
- [订阅](#订阅)
- [任务](#任务长时间运行的工具调用)
- [示例](#示例)
- [OAuth 支持](#oauth-支持)
- [相关资源](#相关资源)
Expand Down Expand Up @@ -954,6 +955,26 @@ impl ClientHandler for MyClient {

---

## 任务(长时间运行的工具调用)

`rmcp` 支持 SEP-1319 中定义的[基于任务的工具调用](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)流程。为工具添加 `execution(task_support = "required" | "optional")` 注解,并在 `ServerHandler` 实现上添加 `#[task_handler]` —— `enqueue_task`、`tasks/list`、`tasks/get`、`tasks/result` 和 `tasks/cancel` 将在 `OperationProcessor` 之上自动生成。

```rust, ignore
#[tool(
description = "Sum two numbers after a 2-second delay",
execution(task_support = "required")
)]
async fn slow_sum(/* ... */) -> Result<CallToolResult, McpError> { /* ... */ }

#[tool_handler]
#[task_handler]
impl ServerHandler for TaskDemo {}
```

完整的端到端示例请参阅 [`servers_task_stdio`](../../examples/servers/src/task_stdio.rs) 及对应的 [`clients_task_stdio`](../../examples/clients/src/task_stdio.rs)。

---

## 示例

查看 [examples](../../examples/README.md)。
Expand Down
4 changes: 4 additions & 0 deletions examples/clients/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ path = "src/progress_client.rs"
[[example]]
name = "clients_client_credentials"
path = "src/auth/client_credentials.rs"

[[example]]
name = "clients_task_stdio"
path = "src/task_stdio.rs"
12 changes: 12 additions & 0 deletions examples/clients/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ A client demonstrating how to use the sampling tool.
- Retrieves server information and list of available tools
- Calls the `ask_llm` tool

### Task Standard I/O Client (`task_stdio.rs`)

A client that exercises the task lifecycle against `servers_task_stdio`
(per [SEP-1319](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)).

- Spawns `servers_task_stdio` as a child process over stdio
- Calls `quick_echo` synchronously
- Calls `slow_sum` as a task via `CallToolRequestParams::with_task(...)`, polls `tasks/get` until completion, then fetches the result via `tasks/result`

### Progress Test Client (`progress_client.rs`)

A client that communicates with an MCP server using progress notifications.
Expand Down Expand Up @@ -91,6 +100,9 @@ cargo run -p mcp-client-examples --example clients_oauth_client

# Run the sampling standard I/O client example
cargo run -p mcp-client-examples --example clients_sampling_stdio

# Run the task-based invocation client (drives servers_task_stdio)
cargo run -p mcp-client-examples --example clients_task_stdio
```

## Dependencies
Expand Down
127 changes: 127 additions & 0 deletions examples/clients/src/task_stdio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//! Client for the task-demo server in `examples/servers/src/task_stdio.rs`.
//!
//! Walks through the task lifecycle (SEP-1319):
//! 1. Call a regular tool (`quick_echo`) — synchronous response.
//! 2. Call a task-required tool (`slow_sum`) by attaching `task: {}` to
//! the `tools/call` request. The server returns a `Task` with a `task_id`.
//! 3. Poll `tasks/get` until status becomes `Completed`.
//! 4. Fetch the underlying `CallToolResult` via `tasks/result`.

use anyhow::{Result, anyhow};
use rmcp::{
ServiceExt,
model::{
CallToolRequestParams, CallToolResult, ClientRequest, GetTaskInfoParams,
GetTaskResultParams, JsonObject, Request, ServerResult, TaskStatus,
},
object,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use tokio::process::Command;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| format!("info,{}=debug", env!("CARGO_CRATE_NAME")).into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

// Spawn the task-demo server as a child process over stdio.
let client = ()
.serve(TokioChildProcess::new(Command::new("cargo").configure(
|cmd| {
cmd.arg("run")
.arg("-q")
.arg("-p")
.arg("mcp-server-examples")
.arg("--example")
.arg("servers_task_stdio");
},
))?)
.await?;

// 1) Synchronous call. `quick_echo` has the default task_support = forbidden.
let echo = client
.call_tool(
CallToolRequestParams::new("quick_echo")
.with_arguments(object!({ "message": "hi from rmcp" })),
)
.await?;
tracing::info!("quick_echo -> {echo:#?}");

// 2) Task call. `slow_sum` is task_support = required, so we MUST attach a
// `task` object. An empty object is fine — clients can stash arbitrary
// metadata here that the server-side `OperationDescriptor` will keep.
let create = client
.send_request(ClientRequest::CallToolRequest(Request::new(
CallToolRequestParams::new("slow_sum")
.with_arguments(object!({ "a": 40, "b": 2 }))
.with_task(JsonObject::new()),
)))
.await?;
let ServerResult::CreateTaskResult(create) = create else {
return Err(anyhow!("expected CreateTaskResult, got {create:?}"));
};
let task_id = create.task.task_id.clone();
tracing::info!(
"slow_sum enqueued as task {task_id} (status = {:?})",
create.task.status
);

// 3) Poll `tasks/get` until the server reports a terminal status.
let final_status = loop {
tokio::time::sleep(std::time::Duration::from_millis(250)).await;

let info = client
.send_request(ClientRequest::GetTaskInfoRequest(Request::new(
GetTaskInfoParams {
meta: None,
task_id: task_id.clone(),
},
)))
.await?;
let ServerResult::GetTaskResult(info) = info else {
return Err(anyhow!("expected GetTaskResult, got {info:?}"));
};
tracing::info!("status = {:?}", info.task.status);

match info.task.status {
TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => {
break info.task.status;
}
_ => {}
}
};

if final_status != TaskStatus::Completed {
return Err(anyhow!("task ended in {final_status:?}"));
}

// 4) Fetch the payload. The server-side handler returns a serialized
// `CallToolResult`. On the wire the response is just a JSON value, and
// `ServerResult` is `#[serde(untagged)]`, so the client decodes it as
// whichever variant the JSON shape matches first — a `CallToolResult`
// here. (For a non-tool task the same value would surface as
// `ServerResult::CustomResult` and need manual `serde_json::from_value`.)
let payload = client
.send_request(ClientRequest::GetTaskResultRequest(Request::new(
GetTaskResultParams {
meta: None,
task_id: task_id.clone(),
},
)))
.await?;
let call_result: CallToolResult = match payload {
ServerResult::CallToolResult(r) => r,
ServerResult::CustomResult(c) => serde_json::from_value(c.0)?,
other => return Err(anyhow!("unexpected task result: {other:?}")),
};
tracing::info!("slow_sum result -> {call_result:#?}");

client.cancel().await?;
Ok(())
}
4 changes: 4 additions & 0 deletions examples/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,7 @@ path = "src/calculator_stdio.rs"
[[example]]
name = "elicitation_enum_select"
path = "src/elicitation_enum_inference.rs"

[[example]]
name = "servers_task_stdio"
path = "src/task_stdio.rs"
10 changes: 10 additions & 0 deletions examples/servers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ A server demonstrating the prompt framework capabilities.
- Uses standard I/O transport
- Good example of prompt implementation patterns

### Task Demo Server (`task_stdio.rs`)

A minimal stdio server demonstrating task-based tool invocation per
[SEP-1319](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks).

- `slow_sum` is declared with `execution(task_support = "required")`, so clients MUST invoke it as a task
- `quick_echo` is a regular synchronous tool for contrast
- Wires up `enqueue_task` / `tasks/get` / `tasks/result` / `tasks/cancel` via `#[task_handler]`
- Pair with `examples/clients/src/task_stdio.rs` to see the full lifecycle (create → poll → fetch result)

### Progress Demo Server (`progress_demo.rs`)

A server that demonstrates progress notifications during long-running operations.
Expand Down
1 change: 1 addition & 0 deletions examples/servers/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod calculator;
pub mod counter;
pub mod generic_service;
pub mod progress_demo;
pub mod task_demo;
93 changes: 93 additions & 0 deletions examples/servers/src/common/task_demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! Minimal example of a tool that supports task-based invocation (SEP-1319).
//!
//! - `slow_sum` is marked `task_support = "required"`, so the client MUST invoke
//! it as a task. The server enqueues the call into an `OperationProcessor`,
//! returns a task id immediately, and the client polls `tasks/get` and
//! fetches the payload via `tasks/result`.
//! - `quick_echo` is a regular synchronous tool for contrast (the default,
//! `task_support = "forbidden"`).
//!
//! See `examples/clients/src/task_stdio.rs` for the matching client.

#![allow(dead_code)]

use std::sync::Arc;

use rmcp::{
ErrorData as McpError, ServerHandler,
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
model::{CallToolResult, Content},
schemars, task_handler,
task_manager::OperationProcessor,
tool, tool_handler, tool_router,
};
use tokio::sync::Mutex;

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct SumArgs {
pub a: i32,
pub b: i32,
}

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct EchoArgs {
pub message: String,
}

/// Server state. The `processor` field is required by `#[task_handler]`:
/// the macro generates `enqueue_task` / `tasks/*` handlers that submit and
/// poll operations through it.
#[derive(Clone)]
pub struct TaskDemo {
tool_router: ToolRouter<TaskDemo>,
processor: Arc<Mutex<OperationProcessor>>,
}

impl Default for TaskDemo {
fn default() -> Self {
Self::new()
}
}

#[tool_router]
impl TaskDemo {
pub fn new() -> Self {
Self {
tool_router: Self::tool_router(),
processor: Arc::new(Mutex::new(OperationProcessor::new())),
}
}

/// Long-running tool. The `execution(task_support = "required")` attribute
/// tells clients they MUST call this tool as a task; the server returns
/// `-32601` if they don't.
#[tool(
description = "Sum two numbers after a 2-second delay",
execution(task_support = "required")
)]
async fn slow_sum(
&self,
Parameters(SumArgs { a, b }): Parameters<SumArgs>,
) -> Result<CallToolResult, McpError> {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Ok(CallToolResult::success(vec![Content::text(
(a + b).to_string(),
)]))
}

/// Synchronous tool with the default `task_support = "forbidden"`.
#[tool(description = "Echo a message back immediately")]
async fn quick_echo(
&self,
Parameters(EchoArgs { message }): Parameters<EchoArgs>,
) -> Result<CallToolResult, McpError> {
Ok(CallToolResult::success(vec![Content::text(message)]))
}
}

/// `#[task_handler]` reads `self.processor` (configurable via the macro's
/// `processor = ...` argument) and synthesizes `enqueue_task`, `list_tasks`,
/// `get_task_info`, `get_task_result`, and `cancel_task` for us.
#[tool_handler]
#[task_handler]
impl ServerHandler for TaskDemo {}
27 changes: 27 additions & 0 deletions examples/servers/src/task_stdio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Result;
use common::task_demo::TaskDemo;
use rmcp::{ServiceExt, transport::stdio};
use tracing_subscriber::{self, EnvFilter};
mod common;

/// Stdio server demonstrating task-based tool invocation.
///
/// Run a matching client with:
/// cargo run -p mcp-client-examples --example clients_task_stdio
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into()))
.with_writer(std::io::stderr)
.with_ansi(false)
.init();

tracing::info!("Starting task-demo MCP server");

let service = TaskDemo::new().serve(stdio()).await.inspect_err(|e| {
tracing::error!("serving error: {e:?}");
})?;

service.waiting().await?;
Ok(())
}
Loading