1616import java .util .function .Consumer ;
1717import java .util .function .Function ;
1818
19- import org .slf4j .Logger ;
20- import org .slf4j .LoggerFactory ;
2119import io .modelcontextprotocol .client .transport .ResponseSubscribers .ResponseEvent ;
2220import io .modelcontextprotocol .client .transport .customizer .McpAsyncHttpClientRequestCustomizer ;
2321import io .modelcontextprotocol .client .transport .customizer .McpSyncHttpClientRequestCustomizer ;
3331import io .modelcontextprotocol .spec .ProtocolVersions ;
3432import io .modelcontextprotocol .util .Assert ;
3533import io .modelcontextprotocol .util .Utils ;
34+ import org .slf4j .Logger ;
35+ import org .slf4j .LoggerFactory ;
3636import reactor .core .Disposable ;
3737import reactor .core .publisher .Flux ;
3838import reactor .core .publisher .Mono ;
@@ -117,6 +117,11 @@ public class HttpClientSseClientTransport implements McpClientTransport {
117117 */
118118 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
119119
120+ /**
121+ * Validator for the message endpoint;
122+ */
123+ private final SseMessageEndpointValidator messageEndpointValidator ;
124+
120125 /**
121126 * Creates a new transport instance with custom HTTP client builder, object mapper,
122127 * and headers.
@@ -127,22 +132,26 @@ public class HttpClientSseClientTransport implements McpClientTransport {
127132 * @param jsonMapper the object mapper for JSON serialization/deserialization
128133 * @param httpRequestCustomizer customizer for the requestBuilder before executing
129134 * requests
135+ * @param messageEndpointValidator validator for the message endpoint
130136 * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
131137 */
132138 HttpClientSseClientTransport (HttpClient httpClient , HttpRequest .Builder requestBuilder , String baseUri ,
133- String sseEndpoint , McpJsonMapper jsonMapper , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ) {
139+ String sseEndpoint , McpJsonMapper jsonMapper , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
140+ SseMessageEndpointValidator messageEndpointValidator ) {
134141 Assert .notNull (jsonMapper , "jsonMapper must not be null" );
135142 Assert .hasText (baseUri , "baseUri must not be empty" );
136143 Assert .hasText (sseEndpoint , "sseEndpoint must not be empty" );
137144 Assert .notNull (httpClient , "httpClient must not be null" );
138145 Assert .notNull (requestBuilder , "requestBuilder must not be null" );
139146 Assert .notNull (httpRequestCustomizer , "httpRequestCustomizer must not be null" );
147+ Assert .notNull (messageEndpointValidator , "messageEndpointValidator must not be null" );
140148 this .baseUri = URI .create (baseUri );
141149 this .sseEndpoint = sseEndpoint ;
142150 this .jsonMapper = jsonMapper ;
143151 this .httpClient = httpClient ;
144152 this .requestBuilder = requestBuilder ;
145153 this .httpRequestCustomizer = httpRequestCustomizer ;
154+ this .messageEndpointValidator = messageEndpointValidator ;
146155 }
147156
148157 @ Override
@@ -178,6 +187,8 @@ public static class Builder {
178187
179188 private Duration connectTimeout = Duration .ofSeconds (10 );
180189
190+ private SseMessageEndpointValidator messageEndpointValidator = new DefaultSseMessageEndpointValidator ();
191+
181192 /**
182193 * Creates a new builder instance.
183194 */
@@ -297,14 +308,27 @@ public Builder connectTimeout(Duration connectTimeout) {
297308 return this ;
298309 }
299310
311+ /**
312+ * Sets the validator that ensure the message endpoint returned over the SSE
313+ * connection is valid.
314+ * @param messageEndpointValidator the validator
315+ * @return this builder
316+ */
317+ public Builder messageEndpointValidator (SseMessageEndpointValidator messageEndpointValidator ) {
318+ Assert .notNull (messageEndpointValidator , "messageEndpointValidator must not be null" );
319+ this .messageEndpointValidator = messageEndpointValidator ;
320+ return this ;
321+ }
322+
300323 /**
301324 * Builds a new {@link HttpClientSseClientTransport} instance.
302325 * @return a new transport instance
303326 */
304327 public HttpClientSseClientTransport build () {
305328 HttpClient httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
306329 return new HttpClientSseClientTransport (httpClient , requestBuilder , baseUri , sseEndpoint ,
307- jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper , httpRequestCustomizer );
330+ jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper , httpRequestCustomizer ,
331+ messageEndpointValidator );
308332 }
309333
310334 }
@@ -342,6 +366,14 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
342366 try {
343367 if (ENDPOINT_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
344368 String messageEndpointUri = responseEvent .sseEvent ().data ();
369+ try {
370+ messageEndpointValidator .validate (uri , messageEndpointUri );
371+ }
372+ catch (InvalidSseMessageEndpointException e ) {
373+ sink .error (e );
374+ this .messageEndpointSink .tryEmitError (e );
375+ return Flux .error (e );
376+ }
345377 if (this .messageEndpointSink .tryEmitValue (messageEndpointUri ).isSuccess ()) {
346378 sink .success ();
347379 return Flux .empty (); // No further processing needed
0 commit comments