Skip to content

Commit 009c16d

Browse files
author
Gorre Surya
committed
fix: use negotiated protocol version in GET SSE reconnect after initialize
When the server negotiates down to an older protocol version during the initialize handshake, the subsequent GET request to open the SSE stream was still sending the client's latest supported version in the MCP-Protocol-Version header. This caused servers that strictly validate the header (e.g. rmcp) to reject the GET with 400 Bad Request. Root cause: the GET reconnect is triggered inside sendMessage() via reconnect(null).contextWrite(deliveredSink.contextView()) at the point where markInitialized() returns true. At that moment the Reactor context has not yet been populated with NEGOTIATED_PROTOCOL_VERSION by LifecycleInitializer (which runs after sendMessage() completes), so reconnect() falls back to latestSupportedProtocolVersion. Fix: add a negotiatedProtocolVersion AtomicReference to the transport. When markInitialized() returns true, extract the protocolVersion from the initialize response body (available in the AggregateResponseEvent or SseResponseEvent) and store it. The reconnect() method then uses this stored value as a fallback when the Reactor context is not yet populated, which covers the initial GET reconnect. Subsequent reconnects continue to read from the context as before. The existing usesServerSupportedVersion integration test, which had a FIXME acknowledging the bug, now also verifies the GET request uses the negotiated version. Fixes #883
1 parent c00b87d commit 009c16d

2 files changed

Lines changed: 52 additions & 7 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Collections;
1515
import java.util.Comparator;
1616
import java.util.List;
17+
import java.util.Map;
1718
import java.util.Optional;
1819
import java.util.concurrent.CompletionException;
1920
import java.util.concurrent.atomic.AtomicReference;
@@ -136,6 +137,14 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
136137

137138
private final String latestSupportedProtocolVersion;
138139

140+
/**
141+
* Stores the protocol version negotiated during the initialize handshake so that the
142+
* GET SSE reconnect triggered by {@link #sendMessage} can use the correct version
143+
* immediately, before the Reactor context is populated by
144+
* {@code LifecycleInitializer}.
145+
*/
146+
private final AtomicReference<String> negotiatedProtocolVersion = new AtomicReference<>();
147+
139148
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
140149
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
141150
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
@@ -277,7 +286,8 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
277286
.header("Cache-Control", "no-cache")
278287
.header(HttpHeaders.PROTOCOL_VERSION,
279288
connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
280-
this.latestSupportedProtocolVersion))
289+
Optional.ofNullable(this.negotiatedProtocolVersion.get())
290+
.orElse(this.latestSupportedProtocolVersion)))
281291
.GET();
282292
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
283293
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
@@ -450,6 +460,39 @@ else if (contentType.contains(APPLICATION_JSON)) {
450460

451461
}
452462

463+
/**
464+
* Attempts to parse a {@code protocolVersion} from the initialize response body. This
465+
* is needed because the Reactor context is not yet populated by
466+
* {@code LifecycleInitializer} at the time the first GET reconnect is triggered.
467+
*/
468+
@SuppressWarnings("unchecked")
469+
private Optional<String> extractProtocolVersion(ResponseSubscribers.ResponseEvent responseEvent) {
470+
String data = null;
471+
if (responseEvent instanceof ResponseSubscribers.AggregateResponseEvent agg) {
472+
data = agg.data();
473+
}
474+
else if (responseEvent instanceof ResponseSubscribers.SseResponseEvent sse) {
475+
data = sse.sseEvent().data();
476+
}
477+
if (data == null || data.isBlank()) {
478+
return Optional.empty();
479+
}
480+
try {
481+
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, data);
482+
if (message instanceof McpSchema.JSONRPCResponse response
483+
&& response.result() instanceof Map<?, ?> result) {
484+
Object version = result.get("protocolVersion");
485+
if (version instanceof String v && !v.isBlank()) {
486+
return Optional.of(v);
487+
}
488+
}
489+
}
490+
catch (Exception ignored) {
491+
// Best-effort; the context-based fallback in reconnect() still applies.
492+
}
493+
return Optional.empty();
494+
}
495+
453496
public String toString(McpSchema.JSONRPCMessage message) {
454497
try {
455498
return this.jsonMapper.writeValueAsString(message);
@@ -514,7 +557,11 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
514557
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
515558
// Once we have a session, we try to open an async stream for
516559
// the server to send notifications and requests out-of-band.
517-
560+
// Extract the negotiated protocol version from the initialize
561+
// response body before triggering the GET reconnect, since the
562+
// Reactor context is not yet populated by LifecycleInitializer
563+
// at this point in the reactive chain.
564+
extractProtocolVersion(responseEvent).ifPresent(this.negotiatedProtocolVersion::set);
518565
reconnect(null).contextWrite(deliveredSink.contextView()).subscribe();
519566
}
520567

mcp-test/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,9 @@ void usesServerSupportedVersion() {
103103
McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));
104104

105105
var calls = requestRecordingFilter.getCalls();
106-
// Initialize tells the server the Client's latest supported version
107-
// FIXME: Set the correct protocol version on GET /mcp
108-
assertThat(calls).filteredOn(c -> c.method().equals("POST") && !c.body().contains("\"method\":\"initialize\""))
109-
// POST notification/initialized ; POST tools/call
110-
.hasSize(2)
106+
assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\""))
107+
// GET /mcp ; POST notification/initialized ; POST tools/call
108+
.hasSize(3)
111109
.map(McpTestRequestRecordingServletFilter.Call::headers)
112110
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version",
113111
ProtocolVersions.MCP_2025_11_25));

0 commit comments

Comments
 (0)