Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit d3bd035

Browse files
authored
HTTP Replication Client (#15470)
Separate out a HTTP client for replication in preparation for also supporting using UNIX sockets. The major difference from the base class is that this does not use treq to handle HTTP requests.
1 parent ab4535b commit d3bd035

6 files changed

Lines changed: 297 additions & 3 deletions

File tree

changelog.d/15470.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Create new `Client` for use with HTTP Replication between workers. Contributed by Jason Little.

synapse/http/client.py

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@
7474
from synapse.api.errors import Codes, HttpResponseException, SynapseError
7575
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
7676
from synapse.http.proxyagent import ProxyAgent
77+
from synapse.http.replicationagent import ReplicationAgent
7778
from synapse.http.types import QueryParams
78-
from synapse.logging.context import make_deferred_yieldable
79+
from synapse.logging.context import make_deferred_yieldable, run_in_background
7980
from synapse.logging.opentracing import set_tag, start_active_span, tags
8081
from synapse.types import ISynapseReactor
8182
from synapse.util import json_decoder
@@ -819,6 +820,136 @@ def __init__(
819820
)
820821

821822

823+
class ReplicationClient(BaseHttpClient):
824+
"""Client for connecting to replication endpoints via HTTP and HTTPS.
825+
826+
Attributes:
827+
agent: The custom Twisted Agent used for constructing the connection.
828+
"""
829+
830+
def __init__(
831+
self,
832+
hs: "HomeServer",
833+
):
834+
"""
835+
Args:
836+
hs: The HomeServer instance to pass in
837+
"""
838+
super().__init__(hs)
839+
840+
# Use a pool, but a very small one.
841+
pool = HTTPConnectionPool(self.reactor)
842+
pool.maxPersistentPerHost = 5
843+
pool.cachedConnectionTimeout = 2 * 60
844+
845+
self.agent: IAgent = ReplicationAgent(
846+
hs.get_reactor(),
847+
contextFactory=hs.get_http_client_context_factory(),
848+
pool=pool,
849+
)
850+
851+
async def request(
852+
self,
853+
method: str,
854+
uri: str,
855+
data: Optional[bytes] = None,
856+
headers: Optional[Headers] = None,
857+
) -> IResponse:
858+
"""
859+
Make a request, differs from BaseHttpClient.request in that it does not use treq.
860+
861+
Args:
862+
method: HTTP method to use.
863+
uri: URI to query.
864+
data: Data to send in the request body, if applicable.
865+
headers: Request headers.
866+
867+
Returns:
868+
Response object, once the headers have been read.
869+
870+
Raises:
871+
RequestTimedOutError if the request times out before the headers are read
872+
873+
"""
874+
outgoing_requests_counter.labels(method).inc()
875+
876+
logger.debug("Sending request %s %s", method, uri)
877+
878+
with start_active_span(
879+
"outgoing-replication-request",
880+
tags={
881+
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
882+
tags.HTTP_METHOD: method,
883+
tags.HTTP_URL: uri,
884+
},
885+
finish_on_close=True,
886+
):
887+
try:
888+
body_producer = None
889+
if data is not None:
890+
body_producer = QuieterFileBodyProducer(
891+
BytesIO(data),
892+
cooperator=self._cooperator,
893+
)
894+
895+
# Skip the fancy treq stuff, we don't need cookie handling, redirects,
896+
# or buffered response bodies.
897+
method_bytes = method.encode("ascii")
898+
uri_bytes = uri.encode("ascii")
899+
900+
# To preserve the logging context, the timeout is treated
901+
# in a similar way to `defer.gatherResults`:
902+
# * Each logging context-preserving fork is wrapped in
903+
# `run_in_background`. In this case there is only one,
904+
# since the timeout fork is not logging-context aware.
905+
# * The `Deferred` that joins the forks back together is
906+
# wrapped in `make_deferred_yieldable` to restore the
907+
# logging context regardless of the path taken.
908+
# (The logic/comments for this came from MatrixFederationHttpClient)
909+
request_deferred = run_in_background(
910+
self.agent.request,
911+
method_bytes,
912+
uri_bytes,
913+
headers,
914+
bodyProducer=body_producer,
915+
)
916+
917+
# we use our own timeout mechanism rather than twisted's as a workaround
918+
# for https://twistedmatrix.com/trac/ticket/9534.
919+
# (Updated url https://github.com/twisted/twisted/issues/9534)
920+
request_deferred = timeout_deferred(
921+
request_deferred,
922+
60,
923+
self.hs.get_reactor(),
924+
)
925+
926+
# turn timeouts into RequestTimedOutErrors
927+
request_deferred.addErrback(_timeout_to_request_timed_out_error)
928+
929+
response = await make_deferred_yieldable(request_deferred)
930+
931+
incoming_responses_counter.labels(method, response.code).inc()
932+
logger.info(
933+
"Received response to %s %s: %s",
934+
method,
935+
uri,
936+
response.code,
937+
)
938+
return response
939+
except Exception as e:
940+
incoming_responses_counter.labels(method, "ERR").inc()
941+
logger.info(
942+
"Error sending request to %s %s: %s %s",
943+
method,
944+
uri,
945+
type(e).__name__,
946+
e.args[0],
947+
)
948+
set_tag(tags.ERROR, True)
949+
set_tag("error_reason", e.args[0])
950+
raise
951+
952+
822953
def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
823954
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
824955
# The TCP connection has its own timeout (set by the 'connectTimeout' param

synapse/http/replicationagent.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2023 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
from typing import Optional
17+
18+
from zope.interface import implementer
19+
20+
from twisted.internet import defer
21+
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
22+
from twisted.internet.interfaces import IStreamClientEndpoint
23+
from twisted.python.failure import Failure
24+
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
25+
from twisted.web.error import SchemeNotSupported
26+
from twisted.web.http_headers import Headers
27+
from twisted.web.iweb import (
28+
IAgent,
29+
IAgentEndpointFactory,
30+
IBodyProducer,
31+
IPolicyForHTTPS,
32+
IResponse,
33+
)
34+
35+
from synapse.types import ISynapseReactor
36+
37+
logger = logging.getLogger(__name__)
38+
39+
40+
@implementer(IAgentEndpointFactory)
41+
class ReplicationEndpointFactory:
42+
"""Connect to a given TCP socket"""
43+
44+
def __init__(
45+
self,
46+
reactor: ISynapseReactor,
47+
context_factory: IPolicyForHTTPS,
48+
) -> None:
49+
self.reactor = reactor
50+
self.context_factory = context_factory
51+
52+
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
53+
"""
54+
This part of the factory decides what kind of endpoint is being connected to.
55+
56+
Args:
57+
uri: The pre-parsed URI object containing all the uri data
58+
59+
Returns: The correct client endpoint object
60+
"""
61+
if uri.scheme in (b"http", b"https"):
62+
endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
63+
if uri.scheme == b"https":
64+
endpoint = wrapClientTLS(
65+
self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
66+
)
67+
return endpoint
68+
else:
69+
raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}")
70+
71+
72+
@implementer(IAgent)
73+
class ReplicationAgent(_AgentBase):
74+
"""
75+
Client for connecting to replication endpoints via HTTP and HTTPS.
76+
77+
Much of this code is copied from Twisted's twisted.web.client.Agent.
78+
"""
79+
80+
def __init__(
81+
self,
82+
reactor: ISynapseReactor,
83+
contextFactory: IPolicyForHTTPS,
84+
connectTimeout: Optional[float] = None,
85+
bindAddress: Optional[bytes] = None,
86+
pool: Optional[HTTPConnectionPool] = None,
87+
):
88+
"""
89+
Create a ReplicationAgent.
90+
91+
Args:
92+
reactor: A reactor for this Agent to place outgoing connections.
93+
contextFactory: A factory for TLS contexts, to control the
94+
verification parameters of OpenSSL. The default is to use a
95+
BrowserLikePolicyForHTTPS, so unless you have special
96+
requirements you can leave this as-is.
97+
connectTimeout: The amount of time that this Agent will wait
98+
for the peer to accept a connection.
99+
bindAddress: The local address for client sockets to bind to.
100+
pool: An HTTPConnectionPool instance, or None, in which
101+
case a non-persistent HTTPConnectionPool instance will be
102+
created.
103+
"""
104+
_AgentBase.__init__(self, reactor, pool)
105+
endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
106+
self._endpointFactory = endpoint_factory
107+
108+
def request(
109+
self,
110+
method: bytes,
111+
uri: bytes,
112+
headers: Optional[Headers] = None,
113+
bodyProducer: Optional[IBodyProducer] = None,
114+
) -> "defer.Deferred[IResponse]":
115+
"""
116+
Issue a request to the server indicated by the given uri.
117+
118+
An existing connection from the connection pool may be used or a new
119+
one may be created.
120+
121+
Currently, HTTP and HTTPS schemes are supported in uri.
122+
123+
This is copied from twisted.web.client.Agent, except:
124+
125+
* It uses a different pool key (combining the host & port).
126+
* It does not call _ensureValidURI(...) since it breaks on some
127+
UNIX paths.
128+
129+
See: twisted.web.iweb.IAgent.request
130+
"""
131+
parsedURI = URI.fromBytes(uri)
132+
try:
133+
endpoint = self._endpointFactory.endpointForURI(parsedURI)
134+
except SchemeNotSupported:
135+
return defer.fail(Failure())
136+
137+
# This sets the Pool key to be:
138+
# (http(s), <host:ip>)
139+
key = (parsedURI.scheme, parsedURI.netloc)
140+
141+
# _requestWithEndpoint comes from _AgentBase class
142+
return self._requestWithEndpoint(
143+
key,
144+
endpoint,
145+
method,
146+
parsedURI,
147+
headers,
148+
bodyProducer,
149+
parsedURI.originForm,
150+
)

synapse/replication/http/_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def make_client(cls, hs: "HomeServer") -> Callable:
194194
the `instance_map` config).
195195
"""
196196
clock = hs.get_clock()
197-
client = hs.get_simple_http_client()
197+
client = hs.get_replication_client()
198198
local_instance_name = hs.get_instance_name()
199199

200200
# The value of these option should match the replication listener settings

synapse/server.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@
107107
from synapse.handlers.sync import SyncHandler
108108
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
109109
from synapse.handlers.user_directory import UserDirectoryHandler
110-
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
110+
from synapse.http.client import (
111+
InsecureInterceptableContextFactory,
112+
ReplicationClient,
113+
SimpleHttpClient,
114+
)
111115
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
112116
from synapse.media.media_repository import MediaRepository
113117
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
@@ -471,6 +475,13 @@ def get_federation_http_client(self) -> MatrixFederationHttpClient:
471475
)
472476
return MatrixFederationHttpClient(self, tls_client_options_factory)
473477

478+
@cache_in_self
479+
def get_replication_client(self) -> ReplicationClient:
480+
"""
481+
An HTTP client for HTTP replication.
482+
"""
483+
return ReplicationClient(self)
484+
474485
@cache_in_self
475486
def get_room_creation_handler(self) -> RoomCreationHandler:
476487
return RoomCreationHandler(self)

tests/test_state.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ def setUp(self) -> None:
228228
"get_macaroon_generator",
229229
"get_instance_name",
230230
"get_simple_http_client",
231+
"get_replication_client",
231232
"hostname",
232233
]
233234
)

0 commit comments

Comments
 (0)