Skip to content

Commit f230376

Browse files
committed
fix: surface streamable http request failures
1 parent 3d7b311 commit f230376

3 files changed

Lines changed: 73 additions & 4 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,20 @@ async def _handle_reconnection(
429429
# Try to reconnect again if we still have an event ID
430430
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
431431

432+
async def _handle_request_error(self, ctx: RequestContext, exc: Exception) -> None:
433+
"""Report a request transport failure without crashing the transport task group."""
434+
logger.debug("Error handling StreamableHTTP request", exc_info=True)
435+
436+
message = ctx.session_message.message
437+
if isinstance(message, JSONRPCRequest):
438+
error_data = ErrorData(code=INTERNAL_ERROR, message=f"Transport error: {exc}")
439+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
440+
with contextlib.suppress(anyio.BrokenResourceError, anyio.ClosedResourceError):
441+
await ctx.read_stream_writer.send(error_msg)
442+
else:
443+
with contextlib.suppress(anyio.BrokenResourceError, anyio.ClosedResourceError):
444+
await ctx.read_stream_writer.send(exc)
445+
432446
async def post_writer(
433447
self,
434448
client: httpx.AsyncClient,
@@ -468,10 +482,13 @@ async def _handle_message(session_message: SessionMessage) -> None:
468482
)
469483

470484
async def handle_request_async():
471-
if is_resumption:
472-
await self._handle_resumption_request(ctx)
473-
else:
474-
await self._handle_post_request(ctx)
485+
try:
486+
if is_resumption:
487+
await self._handle_resumption_request(ctx)
488+
else:
489+
await self._handle_post_request(ctx)
490+
except Exception as exc:
491+
await self._handle_request_error(ctx, exc)
475492

476493
# If this is a request, start a new task to handle it
477494
if isinstance(message, JSONRPCRequest):

tests/client/test_notification_response.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import json
88

9+
import anyio
910
import httpx
1011
import pytest
1112
from starlette.applications import Starlette
@@ -152,6 +153,19 @@ async def test_http_error_status_sends_jsonrpc_error() -> None:
152153
await session.list_tools()
153154

154155

156+
async def test_transport_error_sends_jsonrpc_error() -> None:
157+
"""Verify request transport errors unblock the pending request with an MCPError."""
158+
159+
async def raise_connect_error(request: httpx.Request) -> httpx.Response:
160+
raise httpx.ConnectError("All connection attempts failed", request=request)
161+
162+
async with httpx.AsyncClient(transport=httpx.MockTransport(raise_connect_error)) as client:
163+
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
164+
async with ClientSession(read_stream, write_stream) as session: # pragma: no branch
165+
with pytest.raises(MCPError, match="Transport error: All connection attempts failed"):
166+
await session.initialize()
167+
168+
155169
async def test_http_error_on_notification_does_not_hang() -> None:
156170
"""Verify HTTP errors on notifications are silently ignored.
157171
@@ -168,6 +182,23 @@ async def test_http_error_on_notification_does_not_hang() -> None:
168182
await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed"))
169183

170184

185+
async def test_transport_error_on_notification_does_not_crash_transport() -> None:
186+
"""Verify transport errors on notifications do not crash the transport task group."""
187+
188+
async def handle_request(request: httpx.Request) -> httpx.Response:
189+
data = json.loads(request.content)
190+
if data.get("method") == "initialize":
191+
return httpx.Response(200, json={"jsonrpc": "2.0", "id": data["id"], "result": INIT_RESPONSE})
192+
raise httpx.ConnectError("All connection attempts failed", request=request)
193+
194+
async with httpx.AsyncClient(transport=httpx.MockTransport(handle_request)) as client:
195+
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
196+
async with ClientSession(read_stream, write_stream) as session: # pragma: no branch
197+
await session.initialize()
198+
await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed"))
199+
await anyio.sleep(0)
200+
201+
171202
def _create_invalid_json_response_app() -> Starlette:
172203
"""Create a server that returns invalid JSON for requests."""
173204

tests/client/test_session_group.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,3 +385,24 @@ async def test_client_session_group_establish_session_parameterized(
385385
# 3. Assert returned values
386386
assert returned_server_info is mock_initialize_result.server_info
387387
assert returned_session is mock_entered_session
388+
389+
390+
@pytest.mark.anyio
391+
async def test_client_session_group_streamable_http_connect_error_is_catchable() -> None:
392+
async def raise_connect_error(request: httpx.Request) -> httpx.Response:
393+
raise httpx.ConnectError("All connection attempts failed", request=request)
394+
395+
def mock_client_factory(
396+
headers: dict[str, str] | None = None,
397+
timeout: httpx.Timeout | None = None,
398+
auth: httpx.Auth | None = None,
399+
) -> httpx.AsyncClient:
400+
return httpx.AsyncClient(transport=httpx.MockTransport(raise_connect_error))
401+
402+
group = ClientSessionGroup()
403+
async with group:
404+
with mock.patch("mcp.client.session_group.create_mcp_http_client", side_effect=mock_client_factory):
405+
with pytest.raises(MCPError, match="Transport error: All connection attempts failed"):
406+
await group.connect_to_server(StreamableHttpParameters(url="http://localhost:3001/mcp/"))
407+
408+
assert group.sessions == []

0 commit comments

Comments
 (0)