Skip to content
63 changes: 33 additions & 30 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from mcp.shared._httpx_utils import create_mcp_http_client
from mcp.shared.message import ClientMessageMetadata, SessionMessage
from mcp.types import (
INVALID_REQUEST,
PARSE_ERROR,
ErrorData,
InitializeResult,
JSONRPCError,
Expand Down Expand Up @@ -163,6 +165,11 @@ async def _handle_sse_event(

except Exception as exc: # pragma: no cover
logger.exception("Error parsing SSE message")
if original_request_id is not None:
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}")
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
await read_stream_writer.send(error_msg)
return True
Comment thread
Kludex marked this conversation as resolved.
await read_stream_writer.send(exc)
return False
else: # pragma: no cover
Expand Down Expand Up @@ -260,7 +267,9 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:

if response.status_code == 404: # pragma: no branch
if isinstance(message, JSONRPCRequest): # pragma: no branch
await self._send_session_terminated_error(ctx.read_stream_writer, message.id)
error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated")
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(session_message)
return

response.raise_for_status()
Expand All @@ -272,20 +281,24 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
if isinstance(message, JSONRPCRequest):
content_type = response.headers.get("content-type", "").lower()
if content_type.startswith("application/json"):
await self._handle_json_response(response, ctx.read_stream_writer, is_initialization)
await self._handle_json_response(
response, ctx.read_stream_writer, is_initialization, request_id=message.id
)
elif content_type.startswith("text/event-stream"):
await self._handle_sse_response(response, ctx, is_initialization)
else:
await self._handle_unexpected_content_type( # pragma: no cover
content_type, # pragma: no cover
ctx.read_stream_writer, # pragma: no cover
) # pragma: no cover
logger.error(f"Unexpected content type: {content_type}")
error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}")
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(error_msg)

async def _handle_json_response(
self,
response: httpx.Response,
read_stream_writer: StreamWriter,
is_initialization: bool = False,
*,
request_id: RequestId,
) -> None:
"""Handle JSON response from the server."""
try:
Expand All @@ -298,9 +311,11 @@ async def _handle_json_response(

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except Exception as exc: # pragma: no cover
except Exception as exc:
logger.exception("Error parsing JSON response")
await read_stream_writer.send(exc)
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse JSON response: {exc}")
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data))
await read_stream_writer.send(error_msg)

async def _handle_sse_response(
self,
Expand All @@ -312,6 +327,11 @@ async def _handle_sse_response(
last_event_id: str | None = None
retry_interval_ms: int | None = None

# The caller (_handle_post_request) only reaches here inside
# isinstance(message, JSONRPCRequest), so this is always a JSONRPCRequest.
assert isinstance(ctx.session_message.message, JSONRPCRequest)
original_request_id = ctx.session_message.message.id

try:
event_source = EventSource(response)
async for sse in event_source.aiter_sse(): # pragma: no branch
Expand All @@ -326,6 +346,7 @@ async def _handle_sse_response(
is_complete = await self._handle_sse_event(
sse,
ctx.read_stream_writer,
original_request_id=original_request_id,
resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None),
is_initialization=is_initialization,
)
Expand All @@ -334,8 +355,8 @@ async def _handle_sse_response(
if is_complete:
await response.aclose()
return # Normal completion, no reconnect needed
except Exception as e:
logger.debug(f"SSE stream ended: {e}") # pragma: no cover
except Exception:
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover

# Stream ended without response - reconnect if we received an event with ID
if last_event_id is not None: # pragma: no branch
Expand Down Expand Up @@ -400,24 +421,6 @@ async def _handle_reconnection(
# Try to reconnect again if we still have an event ID
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)

async def _handle_unexpected_content_type(
self, content_type: str, read_stream_writer: StreamWriter
) -> None: # pragma: no cover
"""Handle unexpected content type in response."""
error_msg = f"Unexpected content type: {content_type}" # pragma: no cover
logger.error(error_msg) # pragma: no cover
await read_stream_writer.send(ValueError(error_msg)) # pragma: no cover

async def _send_session_terminated_error(self, read_stream_writer: StreamWriter, request_id: RequestId) -> None:
"""Send a session terminated error response."""
jsonrpc_error = JSONRPCError(
jsonrpc="2.0",
id=request_id,
error=ErrorData(code=32600, message="Session terminated"),
)
session_message = SessionMessage(jsonrpc_error)
await read_stream_writer.send(session_message)

async def post_writer(
self,
client: httpx.AsyncClient,
Expand Down Expand Up @@ -467,8 +470,8 @@ async def handle_request_async():
else:
await handle_request_async()

except Exception:
logger.exception("Error in post_writer") # pragma: no cover
except Exception: # pragma: no cover
logger.exception("Error in post_writer")
finally:
await read_stream_writer.aclose()
await write_stream.aclose()
Expand Down
2 changes: 1 addition & 1 deletion src/mcp/types/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class JSONRPCError(BaseModel):
"""A response to a request that indicates an error occurred."""

jsonrpc: Literal["2.0"]
id: str | int
id: RequestId
error: ErrorData


Expand Down
204 changes: 109 additions & 95 deletions tests/client/test_notification_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,133 +5,147 @@
"""

import json
import multiprocessing
import socket
from collections.abc import Generator

import httpx
import pytest
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route

from mcp import ClientSession, types
from mcp import ClientSession, MCPError, types
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.session import RequestResponder
from mcp.types import RootsListChangedNotification
from tests.test_helpers import wait_for_server

pytestmark = pytest.mark.anyio

def create_non_sdk_server_app() -> Starlette: # pragma: no cover
INIT_RESPONSE = {
"serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"},
"protocolVersion": "2024-11-05",
"capabilities": {},
}


def _init_json_response(data: dict[str, object]) -> JSONResponse:
return JSONResponse({"jsonrpc": "2.0", "id": data["id"], "result": INIT_RESPONSE})


def _create_non_sdk_server_app() -> Starlette:
"""Create a minimal server that doesn't follow SDK conventions."""

async def handle_mcp_request(request: Request) -> Response:
"""Handle MCP requests with non-standard responses."""
try:
body = await request.body()
data = json.loads(body)

# Handle initialize request normally
if data.get("method") == "initialize":
response_data = {
"jsonrpc": "2.0",
"id": data["id"],
"result": {
"serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"},
"protocolVersion": "2024-11-05",
"capabilities": {},
},
}
return JSONResponse(response_data)

# For notifications, return 204 No Content (non-SDK behavior)
if "id" not in data:
return Response(status_code=204, headers={"Content-Type": "application/json"})

# Default response for other requests
return JSONResponse(
{"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}}
)

except Exception as e:
return JSONResponse({"error": f"Server error: {str(e)}"}, status_code=500)

app = Starlette(
debug=True,
routes=[
Route("/mcp", handle_mcp_request, methods=["POST"]),
],
)
return app


def run_non_sdk_server(port: int) -> None: # pragma: no cover
"""Run the non-SDK server in a separate process."""
app = create_non_sdk_server_app()
config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=port,
log_level="error", # Reduce noise in tests
)
server = uvicorn.Server(config=config)
server.run()


@pytest.fixture
def non_sdk_server_port() -> int:
"""Get an available port for the test server."""
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]


@pytest.fixture
def non_sdk_server(non_sdk_server_port: int) -> Generator[None, None, None]:
"""Start a non-SDK server for testing."""
proc = multiprocessing.Process(target=run_non_sdk_server, kwargs={"port": non_sdk_server_port}, daemon=True)
proc.start()

# Wait for server to be ready
try:
wait_for_server(non_sdk_server_port, timeout=10.0)
except TimeoutError: # pragma: no cover
proc.kill()
proc.join(timeout=2)
pytest.fail("Server failed to start within 10 seconds")

yield

proc.kill()
proc.join(timeout=2)


@pytest.mark.anyio
async def test_non_compliant_notification_response(non_sdk_server: None, non_sdk_server_port: int) -> None:
"""This test verifies that the client ignores unexpected responses to notifications: the spec states they should
either be 202 + no response body, or 4xx + optional error body
body = await request.body()
data = json.loads(body)

if data.get("method") == "initialize":
return _init_json_response(data)

# For notifications, return 204 No Content (non-SDK behavior)
if "id" not in data:
return Response(status_code=204, headers={"Content-Type": "application/json"})

return JSONResponse( # pragma: no cover
{"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}}
)

return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])


def _create_unexpected_content_type_app() -> Starlette:
"""Create a server that returns an unexpected content type for requests."""

async def handle_mcp_request(request: Request) -> Response:
body = await request.body()
data = json.loads(body)

if data.get("method") == "initialize":
return _init_json_response(data)

if "id" not in data:
return Response(status_code=202)

# Return text/plain for all other requests — an unexpected content type.
return Response(content="this is plain text, not json or sse", status_code=200, media_type="text/plain")

return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])


async def test_non_compliant_notification_response() -> None:
"""Verify the client ignores unexpected responses to notifications.

The spec states notifications should get either 202 + no response body, or 4xx + optional error body
(https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server),
but some servers wrongly return other 2xx codes (e.g. 204). For now we simply ignore unexpected responses
(aligning behaviour w/ the TS SDK).
"""
server_url = f"http://127.0.0.1:{non_sdk_server_port}/mcp"
returned_exception = None

async def message_handler( # pragma: no cover
message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
):
) -> None:
nonlocal returned_exception
if isinstance(message, Exception):
returned_exception = message

async with streamable_http_client(server_url) as (read_stream, write_stream):
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_non_sdk_server_app()))
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session:
Comment thread
Kludex marked this conversation as resolved.
Outdated
# Initialize should work normally
await session.initialize()

# The test server returns a 204 instead of the expected 202
await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed"))

if returned_exception: # pragma: no cover
pytest.fail(f"Server encountered an exception: {returned_exception}")


async def test_unexpected_content_type_sends_jsonrpc_error() -> None:
"""Verify unexpected content types unblock the pending request with an MCPError.

When a server returns a content type that is neither application/json nor text/event-stream,
the client should send a JSONRPCError so the pending request resolves immediately
instead of hanging until timeout.
"""
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_unexpected_content_type_app()))
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

with pytest.raises(MCPError, match="Unexpected content type: text/plain"):
await session.list_tools()
Comment thread
Kludex marked this conversation as resolved.
Outdated


def _create_invalid_json_response_app() -> Starlette:
"""Create a server that returns invalid JSON for requests."""

async def handle_mcp_request(request: Request) -> Response:
body = await request.body()
data = json.loads(body)

if data.get("method") == "initialize":
return _init_json_response(data)

if "id" not in data:
return Response(status_code=202)

# Return application/json content type but with invalid JSON body.
return Response(content="not valid json{{{", status_code=200, media_type="application/json")

return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])


async def test_invalid_json_response_sends_jsonrpc_error() -> None:
"""Verify invalid JSON responses unblock the pending request with an MCPError.

When a server returns application/json with an unparseable body, the client
should send a JSONRPCError so the pending request resolves immediately
instead of hanging until timeout.
"""
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_invalid_json_response_app()))
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
Comment thread
Kludex marked this conversation as resolved.
Outdated
await session.initialize()

with pytest.raises(MCPError, match="Failed to parse JSON response"):
await session.list_tools()
Loading