Skip to content

Commit 5d3c43d

Browse files
committed
refactor: move StreamableHTTPASGIApp to streamable_http module
- Move StreamableHTTPASGIApp from fastmcp/server.py to streamable_http.py - Move streamable_http_app() implementation from FastMCP to lowlevel Server - Add session_manager property to lowlevel Server, FastMCP delegates to it - Fix circular import by using TYPE_CHECKING in streamable_http_manager.py Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%) Claude-Steers: 8 Claude-Permission-Prompts: 1 Claude-Escapes: 0
1 parent d41d0c0 commit 5d3c43d

File tree

4 files changed

+170
-122
lines changed

4 files changed

+170
-122
lines changed

src/mcp/server/fastmcp/server.py

Lines changed: 13 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ def __init__(
165165
if auth_server_provider and not token_verifier: # pragma: no cover
166166
self._token_verifier = ProviderTokenVerifier(auth_server_provider)
167167
self._custom_starlette_routes: list[Route] = []
168-
self._session_manager: StreamableHTTPSessionManager | None = None
169168

170169
# Set up MCP protocol handlers
171170
self._setup_handlers()
@@ -211,14 +210,7 @@ def session_manager(self) -> StreamableHTTPSessionManager:
211210
Raises:
212211
RuntimeError: If called before streamable_http_app() has been called.
213212
"""
214-
if self._session_manager is None: # pragma: no cover
215-
raise RuntimeError(
216-
"Session manager can only be accessed after"
217-
"calling streamable_http_app()."
218-
"The session manager is created lazily"
219-
"to avoid unnecessary initialization."
220-
)
221-
return self._session_manager # pragma: no cover
213+
return self._mcp_server.session_manager
222214

223215
@overload
224216
def run(self, transport: Literal["stdio"] = ...) -> None: ...
@@ -929,107 +921,19 @@ def streamable_http_app(
929921
host: str = "127.0.0.1",
930922
) -> Starlette:
931923
"""Return an instance of the StreamableHTTP server app."""
932-
from starlette.middleware import Middleware
933-
934-
# Auto-enable DNS rebinding protection for localhost (IPv4 and IPv6)
935-
if transport_security is None and host in ("127.0.0.1", "localhost", "::1"):
936-
transport_security = TransportSecuritySettings(
937-
enable_dns_rebinding_protection=True,
938-
allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"],
939-
allowed_origins=["http://127.0.0.1:*", "http://localhost:*", "http://[::1]:*"],
940-
)
941-
942-
# Create session manager on first call (lazy initialization)
943-
if self._session_manager is None: # pragma: no branch
944-
self._session_manager = StreamableHTTPSessionManager(
945-
app=self._mcp_server,
946-
event_store=event_store,
947-
retry_interval=retry_interval,
948-
json_response=json_response,
949-
stateless=stateless_http,
950-
security_settings=transport_security,
951-
)
952-
953-
# Create the ASGI handler
954-
streamable_http_app = StreamableHTTPASGIApp(self._session_manager)
955-
956-
# Create routes
957-
routes: list[Route | Mount] = []
958-
middleware: list[Middleware] = []
959-
required_scopes: list[str] = []
960-
961-
# Set up auth if configured
962-
if self.settings.auth: # pragma: no cover
963-
required_scopes = self.settings.auth.required_scopes or []
964-
965-
# Add auth middleware if token verifier is available
966-
if self._token_verifier:
967-
middleware = [
968-
Middleware(
969-
AuthenticationMiddleware,
970-
backend=BearerAuthBackend(self._token_verifier),
971-
),
972-
Middleware(AuthContextMiddleware),
973-
]
974-
975-
# Add auth endpoints if auth server provider is configured
976-
if self._auth_server_provider:
977-
from mcp.server.auth.routes import create_auth_routes
978-
979-
routes.extend(
980-
create_auth_routes(
981-
provider=self._auth_server_provider,
982-
issuer_url=self.settings.auth.issuer_url,
983-
service_documentation_url=self.settings.auth.service_documentation_url,
984-
client_registration_options=self.settings.auth.client_registration_options,
985-
revocation_options=self.settings.auth.revocation_options,
986-
)
987-
)
988-
989-
# Set up routes with or without auth
990-
if self._token_verifier: # pragma: no cover
991-
# Determine resource metadata URL
992-
resource_metadata_url = None
993-
if self.settings.auth and self.settings.auth.resource_server_url:
994-
from mcp.server.auth.routes import build_resource_metadata_url
995-
996-
# Build compliant metadata URL for WWW-Authenticate header
997-
resource_metadata_url = build_resource_metadata_url(self.settings.auth.resource_server_url)
998-
999-
routes.append(
1000-
Route(
1001-
streamable_http_path,
1002-
endpoint=RequireAuthMiddleware(streamable_http_app, required_scopes, resource_metadata_url),
1003-
)
1004-
)
1005-
else:
1006-
# Auth is disabled, no wrapper needed
1007-
routes.append(
1008-
Route(
1009-
streamable_http_path,
1010-
endpoint=streamable_http_app,
1011-
)
1012-
)
1013-
1014-
# Add protected resource metadata endpoint if configured as RS
1015-
if self.settings.auth and self.settings.auth.resource_server_url: # pragma: no cover
1016-
from mcp.server.auth.routes import create_protected_resource_routes
1017-
1018-
routes.extend(
1019-
create_protected_resource_routes(
1020-
resource_url=self.settings.auth.resource_server_url,
1021-
authorization_servers=[self.settings.auth.issuer_url],
1022-
scopes_supported=self.settings.auth.required_scopes,
1023-
)
1024-
)
1025-
1026-
routes.extend(self._custom_starlette_routes)
1027-
1028-
return Starlette(
924+
return self._mcp_server.streamable_http_app(
925+
streamable_http_path=streamable_http_path,
926+
json_response=json_response,
927+
stateless_http=stateless_http,
928+
event_store=event_store,
929+
retry_interval=retry_interval,
930+
transport_security=transport_security,
931+
host=host,
932+
auth=self.settings.auth,
933+
token_verifier=self._token_verifier,
934+
auth_server_provider=self._auth_server_provider,
935+
custom_starlette_routes=self._custom_starlette_routes,
1029936
debug=self.settings.debug,
1030-
routes=routes,
1031-
middleware=middleware,
1032-
lifespan=lambda app: self.session_manager.run(),
1033937
)
1034938

1035939
async def list_prompts(self) -> list[MCPPrompt]:
@@ -1071,16 +975,6 @@ async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -
1071975
raise ValueError(str(e))
1072976

1073977

1074-
class StreamableHTTPASGIApp:
1075-
"""ASGI application for Streamable HTTP server transport."""
1076-
1077-
def __init__(self, session_manager: StreamableHTTPSessionManager):
1078-
self.session_manager = session_manager
1079-
1080-
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover
1081-
await self.session_manager.handle_request(scope, receive, send)
1082-
1083-
1084978
class Context(BaseModel, Generic[ServerSessionT, LifespanContextT, RequestT]):
1085979
"""Context object providing access to MCP capabilities.
1086980

src/mcp/server/lowlevel/server.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,27 @@ async def main():
7979
import anyio
8080
import jsonschema
8181
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
82+
from starlette.applications import Starlette
83+
from starlette.middleware import Middleware
84+
from starlette.middleware.authentication import AuthenticationMiddleware
85+
from starlette.routing import Mount, Route
8286
from typing_extensions import TypeVar
8387

8488
import mcp.types as types
89+
from mcp.server.auth.middleware.auth_context import AuthContextMiddleware
90+
from mcp.server.auth.middleware.bearer_auth import BearerAuthBackend, RequireAuthMiddleware
91+
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, TokenVerifier
92+
from mcp.server.auth.routes import build_resource_metadata_url, create_auth_routes, create_protected_resource_routes
93+
from mcp.server.auth.settings import AuthSettings
8594
from mcp.server.experimental.request_context import Experimental
8695
from mcp.server.lowlevel.experimental import ExperimentalHandlers
8796
from mcp.server.lowlevel.func_inspection import create_call_wrapper
8897
from mcp.server.lowlevel.helper_types import ReadResourceContents
8998
from mcp.server.models import InitializationOptions
9099
from mcp.server.session import ServerSession
100+
from mcp.server.streamable_http import EventStore, StreamableHTTPASGIApp
101+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
102+
from mcp.server.transport_security import TransportSecuritySettings
91103
from mcp.shared.context import RequestContext
92104
from mcp.shared.exceptions import McpError, UrlElicitationRequiredError
93105
from mcp.shared.message import ServerMessageMetadata, SessionMessage
@@ -162,6 +174,7 @@ def __init__(
162174
self.notification_handlers: dict[type, Callable[..., Awaitable[None]]] = {}
163175
self._tool_cache: dict[str, types.Tool] = {}
164176
self._experimental_handlers: ExperimentalHandlers | None = None
177+
self._session_manager: StreamableHTTPSessionManager | None = None
165178
logger.debug("Initializing server %r", name)
166179

167180
def create_initialization_options(
@@ -258,6 +271,20 @@ def experimental(self) -> ExperimentalHandlers:
258271
self._experimental_handlers = ExperimentalHandlers(self, self.request_handlers, self.notification_handlers)
259272
return self._experimental_handlers
260273

274+
@property
275+
def session_manager(self) -> StreamableHTTPSessionManager:
276+
"""Get the StreamableHTTP session manager.
277+
278+
Raises:
279+
RuntimeError: If called before streamable_http_app() has been called.
280+
"""
281+
if self._session_manager is None:
282+
raise RuntimeError(
283+
"Session manager can only be accessed after calling streamable_http_app(). "
284+
"The session manager is created lazily to avoid unnecessary initialization."
285+
)
286+
return self._session_manager
287+
261288
def list_prompts(self):
262289
def decorator(
263290
func: Callable[[], Awaitable[list[types.Prompt]]]
@@ -801,6 +828,118 @@ async def _handle_notification(self, notify: Any):
801828
except Exception: # pragma: no cover
802829
logger.exception("Uncaught exception in notification handler")
803830

831+
def streamable_http_app(
832+
self,
833+
*,
834+
streamable_http_path: str = "/mcp",
835+
json_response: bool = False,
836+
stateless_http: bool = False,
837+
event_store: EventStore | None = None,
838+
retry_interval: int | None = None,
839+
transport_security: TransportSecuritySettings | None = None,
840+
host: str = "127.0.0.1",
841+
auth: AuthSettings | None = None,
842+
token_verifier: TokenVerifier | None = None,
843+
auth_server_provider: (OAuthAuthorizationServerProvider[Any, Any, Any] | None) = None,
844+
custom_starlette_routes: list[Route] | None = None,
845+
debug: bool = False,
846+
) -> Starlette:
847+
"""Return an instance of the StreamableHTTP server app."""
848+
# Auto-enable DNS rebinding protection for localhost (IPv4 and IPv6)
849+
if transport_security is None and host in ("127.0.0.1", "localhost", "::1"):
850+
transport_security = TransportSecuritySettings(
851+
enable_dns_rebinding_protection=True,
852+
allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"],
853+
allowed_origins=["http://127.0.0.1:*", "http://localhost:*", "http://[::1]:*"],
854+
)
855+
856+
session_manager = StreamableHTTPSessionManager(
857+
app=self,
858+
event_store=event_store,
859+
retry_interval=retry_interval,
860+
json_response=json_response,
861+
stateless=stateless_http,
862+
security_settings=transport_security,
863+
)
864+
self._session_manager = session_manager
865+
866+
# Create the ASGI handler
867+
streamable_http_app = StreamableHTTPASGIApp(session_manager)
868+
869+
# Create routes
870+
routes: list[Route | Mount] = []
871+
middleware: list[Middleware] = []
872+
required_scopes: list[str] = []
873+
874+
# Set up auth if configured
875+
if auth: # pragma: no cover
876+
required_scopes = auth.required_scopes or []
877+
878+
# Add auth middleware if token verifier is available
879+
if token_verifier:
880+
middleware = [
881+
Middleware(
882+
AuthenticationMiddleware,
883+
backend=BearerAuthBackend(token_verifier),
884+
),
885+
Middleware(AuthContextMiddleware),
886+
]
887+
888+
# Add auth endpoints if auth server provider is configured
889+
if auth_server_provider:
890+
routes.extend(
891+
create_auth_routes(
892+
provider=auth_server_provider,
893+
issuer_url=auth.issuer_url,
894+
service_documentation_url=auth.service_documentation_url,
895+
client_registration_options=auth.client_registration_options,
896+
revocation_options=auth.revocation_options,
897+
)
898+
)
899+
900+
# Set up routes with or without auth
901+
if token_verifier: # pragma: no cover
902+
# Determine resource metadata URL
903+
resource_metadata_url = None
904+
if auth and auth.resource_server_url:
905+
# Build compliant metadata URL for WWW-Authenticate header
906+
resource_metadata_url = build_resource_metadata_url(auth.resource_server_url)
907+
908+
routes.append(
909+
Route(
910+
streamable_http_path,
911+
endpoint=RequireAuthMiddleware(streamable_http_app, required_scopes, resource_metadata_url),
912+
)
913+
)
914+
else:
915+
# Auth is disabled, no wrapper needed
916+
routes.append(
917+
Route(
918+
streamable_http_path,
919+
endpoint=streamable_http_app,
920+
)
921+
)
922+
923+
# Add protected resource metadata endpoint if configured as RS
924+
if auth and auth.resource_server_url: # pragma: no cover
925+
routes.extend(
926+
create_protected_resource_routes(
927+
resource_url=auth.resource_server_url,
928+
authorization_servers=[auth.issuer_url],
929+
scopes_supported=auth.required_scopes,
930+
)
931+
)
932+
933+
if custom_starlette_routes:
934+
routes.extend(custom_starlette_routes)
935+
936+
return Starlette(
937+
debug=debug,
938+
routes=routes,
939+
middleware=middleware,
940+
lifespan=lambda app: session_manager.run(),
941+
)
942+
804943

805944
async def _ping_handler(request: types.PingRequest) -> types.ServerResult:
806945
return types.ServerResult(types.EmptyResult())

src/mcp/server/streamable_http.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
from contextlib import asynccontextmanager
1515
from dataclasses import dataclass
1616
from http import HTTPStatus
17-
from typing import Any
17+
from typing import TYPE_CHECKING, Any
18+
19+
if TYPE_CHECKING:
20+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
1821

1922
import anyio
2023
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
@@ -1054,3 +1057,13 @@ async def message_router(): # pragma: no cover
10541057
except Exception as e: # pragma: no cover
10551058
# During cleanup, we catch all exceptions since streams might be in various states
10561059
logger.debug(f"Error closing streams: {e}")
1060+
1061+
1062+
class StreamableHTTPASGIApp:
1063+
"""ASGI application for Streamable HTTP server transport."""
1064+
1065+
def __init__(self, session_manager: "StreamableHTTPSessionManager"):
1066+
self.session_manager = session_manager
1067+
1068+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover
1069+
await self.session_manager.handle_request(scope, receive, send)

src/mcp/server/streamable_http_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
from collections.abc import AsyncIterator
88
from http import HTTPStatus
9-
from typing import Any
9+
from typing import TYPE_CHECKING, Any
1010
from uuid import uuid4
1111

1212
import anyio
@@ -15,7 +15,6 @@
1515
from starlette.responses import Response
1616
from starlette.types import Receive, Scope, Send
1717

18-
from mcp.server.lowlevel.server import Server as MCPServer
1918
from mcp.server.streamable_http import (
2019
MCP_SESSION_ID_HEADER,
2120
EventStore,
@@ -24,6 +23,9 @@
2423
from mcp.server.transport_security import TransportSecuritySettings
2524
from mcp.types import INVALID_REQUEST, ErrorData, JSONRPCError
2625

26+
if TYPE_CHECKING:
27+
from mcp.server.lowlevel.server import Server as MCPServer
28+
2729
logger = logging.getLogger(__name__)
2830

2931

0 commit comments

Comments
 (0)