Skip to content

Commit 5306da2

Browse files
committed
feat: add reusable roots enforcement utilities to MCPServer
MCP Roots let clients declare filesystem boundaries to servers, but the SDK provides no helpers to enforce them — every server author has to write the same boundary-check logic by hand. This adds a small utility module so authors can drop in enforcement without reinventing it. - get_roots(ctx): fetch root URIs declared by the client - assert_within_roots(path, ctx): raise PermissionError if path is outside roots - within_roots_check: decorator that auto-enforces roots on path parameters Both sides of the comparison go through Path.resolve() so that platform firmlinks (e.g. macOS /tmp -> /private/tmp, /home -> /System/Volumes/Data/home) don't produce spurious denials.
1 parent f27d2aa commit 5306da2

File tree

3 files changed

+299
-1
lines changed

3 files changed

+299
-1
lines changed

src/mcp/server/mcpserver/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@
44

55
from .context import Context
66
from .server import MCPServer
7+
from .utilities.roots import assert_within_roots, get_roots, within_roots_check
78
from .utilities.types import Audio, Image
89

9-
__all__ = ["MCPServer", "Context", "Image", "Audio", "Icon"]
10+
__all__ = [
11+
"MCPServer",
12+
"Context",
13+
"Image",
14+
"Audio",
15+
"Icon",
16+
"assert_within_roots",
17+
"get_roots",
18+
"within_roots_check",
19+
]
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""Reusable roots enforcement utilities for MCPServer.
2+
3+
Roots define filesystem boundaries that the MCP client declares to the server.
4+
The MCP spec does not auto-enforce these — servers must do it themselves.
5+
This module provides a simple reusable way to do that without rewriting
6+
the logic in every server.
7+
8+
Usage:
9+
from mcp.server.mcpserver import Context, MCPServer
10+
from mcp.server.mcpserver.utilities.roots import (
11+
get_roots,
12+
assert_within_roots,
13+
within_roots_check,
14+
)
15+
16+
mcp = MCPServer("my-server")
17+
18+
@mcp.tool()
19+
async def read_file(path: str, ctx: Context) -> str:
20+
await assert_within_roots(path, ctx)
21+
return open(path).read()
22+
"""
23+
24+
from __future__ import annotations
25+
26+
import functools
27+
import inspect
28+
from collections.abc import Awaitable, Callable
29+
from pathlib import Path
30+
from typing import TYPE_CHECKING, ParamSpec, TypeVar
31+
32+
if TYPE_CHECKING:
33+
from mcp.server.mcpserver import Context
34+
35+
P = ParamSpec("P")
36+
R = TypeVar("R")
37+
38+
39+
async def get_roots(ctx: Context) -> list[str]:
40+
"""Fetch the list of root URIs declared by the connected client.
41+
42+
Returns a list of URI strings e.g. ["file:///home/user/project"].
43+
Returns an empty list if the client declared no roots or does not
44+
support the roots capability.
45+
46+
Args:
47+
ctx: The MCPServer Context object available inside any tool.
48+
49+
Example:
50+
@mcp.tool()
51+
async def my_tool(ctx: Context) -> str:
52+
roots = await get_roots(ctx)
53+
return str(roots)
54+
"""
55+
try:
56+
result = await ctx.session.list_roots()
57+
return [str(root.uri) for root in result.roots]
58+
except Exception:
59+
return []
60+
61+
62+
async def assert_within_roots(path: str | Path, ctx: Context) -> None:
63+
"""Raise PermissionError if path falls outside all client-declared roots.
64+
65+
If the client declared no roots this is a no-op — no restriction applied.
66+
Only file:// URIs are checked. Non-file roots are skipped.
67+
68+
Args:
69+
path: The filesystem path your tool wants to access.
70+
ctx: The MCPServer Context object available inside any tool.
71+
72+
Raises:
73+
PermissionError: If the resolved path is outside all declared roots.
74+
75+
Example:
76+
@mcp.tool()
77+
async def read_file(path: str, ctx: Context) -> str:
78+
await assert_within_roots(path, ctx)
79+
return open(path).read()
80+
"""
81+
roots = await get_roots(ctx)
82+
83+
if not roots:
84+
return
85+
86+
file_roots = [str(Path(r.removeprefix("file://")).resolve()) for r in roots if r.startswith("file://")]
87+
88+
if not file_roots:
89+
return
90+
91+
resolved = str(Path(path).resolve())
92+
93+
if not any(resolved.startswith(root) for root in file_roots):
94+
raise PermissionError(f"Access denied: '{resolved}' is outside the allowed roots.\nAllowed roots: {file_roots}")
95+
96+
97+
def within_roots_check(fn: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
98+
"""Auto-enforce roots on any tool parameter named 'path' or ending with '_path'.
99+
100+
Requires the tool to also accept a `ctx: Context` parameter.
101+
102+
Example:
103+
@mcp.tool()
104+
@within_roots_check
105+
async def read_file(path: str, ctx: Context) -> str:
106+
return open(path).read()
107+
"""
108+
109+
@functools.wraps(fn)
110+
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
111+
sig = inspect.signature(fn)
112+
bound = sig.bind(*args, **kwargs)
113+
bound.apply_defaults()
114+
arguments = bound.arguments
115+
116+
ctx = arguments.get("ctx")
117+
if ctx is None:
118+
raise ValueError("@within_roots_check requires the tool to have a `ctx: Context` parameter.")
119+
120+
for param_name, value in arguments.items():
121+
if value and isinstance(value, str | Path):
122+
if param_name == "path" or param_name.endswith("_path"):
123+
await assert_within_roots(value, ctx)
124+
125+
return await fn(*args, **kwargs)
126+
127+
return wrapper
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""Tests for mcp.server.mcpserver.utilities.roots."""
2+
3+
from __future__ import annotations
4+
5+
from pathlib import Path
6+
from unittest.mock import AsyncMock, MagicMock
7+
8+
import pytest
9+
10+
from mcp.server.mcpserver.utilities.roots import (
11+
assert_within_roots,
12+
get_roots,
13+
within_roots_check,
14+
)
15+
16+
pytestmark = pytest.mark.anyio
17+
18+
19+
# ---------------------------------------------------------------------------
20+
# Helpers
21+
# ---------------------------------------------------------------------------
22+
23+
24+
def make_ctx(root_uris: list[str]) -> MagicMock:
25+
root_objects = [MagicMock(uri=uri) for uri in root_uris]
26+
list_roots_result = MagicMock()
27+
list_roots_result.roots = root_objects
28+
session = MagicMock()
29+
session.list_roots = AsyncMock(return_value=list_roots_result)
30+
ctx = MagicMock()
31+
ctx.session = session
32+
return ctx
33+
34+
35+
def make_failing_ctx() -> MagicMock:
36+
session = MagicMock()
37+
session.list_roots = AsyncMock(side_effect=Exception("not supported"))
38+
ctx = MagicMock()
39+
ctx.session = session
40+
return ctx
41+
42+
43+
# ---------------------------------------------------------------------------
44+
# get_roots
45+
# ---------------------------------------------------------------------------
46+
47+
48+
async def test_get_roots_returns_uris():
49+
ctx = make_ctx(["file:///home/user/project", "file:///tmp/work"])
50+
result = await get_roots(ctx)
51+
assert result == ["file:///home/user/project", "file:///tmp/work"]
52+
53+
54+
async def test_get_roots_returns_empty_when_no_roots():
55+
ctx = make_ctx([])
56+
result = await get_roots(ctx)
57+
assert result == []
58+
59+
60+
async def test_get_roots_returns_empty_on_exception():
61+
ctx = make_failing_ctx()
62+
result = await get_roots(ctx)
63+
assert result == []
64+
65+
66+
# ---------------------------------------------------------------------------
67+
# assert_within_roots
68+
# ---------------------------------------------------------------------------
69+
70+
71+
async def test_assert_passes_when_no_roots():
72+
ctx = make_ctx([])
73+
await assert_within_roots("/any/path/at/all", ctx)
74+
75+
76+
async def test_assert_passes_when_path_inside_root():
77+
ctx = make_ctx(["file:///home/user/project"])
78+
await assert_within_roots("/home/user/project/src/main.py", ctx)
79+
80+
81+
async def test_assert_raises_when_path_outside_root():
82+
ctx = make_ctx(["file:///home/user/project"])
83+
with pytest.raises(PermissionError, match="Access denied"):
84+
await assert_within_roots("/etc/passwd", ctx)
85+
86+
87+
async def test_assert_passes_with_multiple_roots_matching_second():
88+
ctx = make_ctx(["file:///home/user/project", "file:///tmp/work"])
89+
await assert_within_roots("/tmp/work/file.txt", ctx)
90+
91+
92+
async def test_assert_raises_outside_all_roots():
93+
ctx = make_ctx(["file:///home/user/project", "file:///tmp/work"])
94+
with pytest.raises(PermissionError):
95+
await assert_within_roots("/var/log/syslog", ctx)
96+
97+
98+
async def test_assert_accepts_pathlib_path():
99+
ctx = make_ctx(["file:///home/user/project"])
100+
await assert_within_roots(Path("/home/user/project/file.txt"), ctx)
101+
102+
103+
async def test_assert_skips_non_file_roots():
104+
ctx = make_ctx(["https://api.example.com/v1"])
105+
await assert_within_roots("/any/local/path", ctx)
106+
107+
108+
async def test_assert_no_raise_when_client_doesnt_support_roots():
109+
ctx = make_failing_ctx()
110+
await assert_within_roots("/any/path", ctx)
111+
112+
113+
# ---------------------------------------------------------------------------
114+
# within_roots_check decorator
115+
# ---------------------------------------------------------------------------
116+
117+
118+
async def test_decorator_passes_inside_root():
119+
ctx = make_ctx(["file:///home/user/project"])
120+
121+
@within_roots_check
122+
async def read_file(path: str, ctx: MagicMock) -> str:
123+
return "file contents"
124+
125+
result = await read_file(path="/home/user/project/notes.txt", ctx=ctx)
126+
assert result == "file contents"
127+
128+
129+
async def test_decorator_raises_outside_root():
130+
ctx = make_ctx(["file:///home/user/project"])
131+
132+
@within_roots_check
133+
async def read_file(path: str, ctx: MagicMock) -> str:
134+
return "file contents"
135+
136+
with pytest.raises(PermissionError):
137+
await read_file(path="/etc/passwd", ctx=ctx)
138+
139+
140+
async def test_decorator_checks_star_path_params():
141+
ctx = make_ctx(["file:///home/user/project"])
142+
143+
@within_roots_check
144+
async def copy_file(source_path: str, dest_path: str, ctx: MagicMock) -> str:
145+
return "copied"
146+
147+
with pytest.raises(PermissionError):
148+
await copy_file(
149+
source_path="/home/user/project/file.txt",
150+
dest_path="/etc/shadow",
151+
ctx=ctx,
152+
)
153+
154+
155+
async def test_decorator_raises_without_ctx():
156+
@within_roots_check
157+
async def bad_tool(path: str) -> str:
158+
return "oops"
159+
160+
with pytest.raises(ValueError, match="ctx"):
161+
await bad_tool(path="/some/path")

0 commit comments

Comments
 (0)