1717from typing import (
1818 TYPE_CHECKING ,
1919 Any ,
20+ Awaitable ,
21+ Callable ,
2022 Dict ,
2123 Iterable ,
2224 List ,
2325 Mapping ,
2426 Optional ,
2527 Sequence ,
2628 Tuple ,
29+ TypeVar ,
2730)
2831
2932from prometheus_client import Counter
30- from typing_extensions import TypeGuard
33+ from typing_extensions import Concatenate , ParamSpec , TypeGuard
3134
3235from synapse .api .constants import EventTypes , Membership , ThirdPartyEntityKind
3336from synapse .api .errors import CodeMessageException , HttpResponseException
7881HOUR_IN_MS = 60 * 60 * 1000
7982
8083
81- APP_SERVICE_PREFIX = "/_matrix/app/unstable"
84+ APP_SERVICE_PREFIX = "/_matrix/app/v1"
85+ APP_SERVICE_UNSTABLE_PREFIX = "/_matrix/app/unstable"
86+
87+ P = ParamSpec ("P" )
88+ R = TypeVar ("R" )
8289
8390
8491def _is_valid_3pe_metadata (info : JsonDict ) -> bool :
@@ -121,17 +128,60 @@ def __init__(self, hs: "HomeServer"):
121128 hs .get_clock (), "as_protocol_meta" , timeout_ms = HOUR_IN_MS
122129 )
123130
131+ async def _send_with_fallbacks (
132+ self ,
133+ service : "ApplicationService" ,
134+ prefixes : List [str ],
135+ path : str ,
136+ func : Callable [Concatenate [str , P ], Awaitable [R ]],
137+ * args : P .args ,
138+ ** kwargs : P .kwargs ,
139+ ) -> R :
140+ """
141+ Attempt to call an application service with multiple paths, falling back
142+ until one succeeds.
143+
144+ Args:
145+ service: The appliacation service, this provides the base URL.
146+ prefixes: A last of paths to try in order for the requests.
147+ path: A suffix to append to each prefix.
148+ func: The function to call, the first argument will be the full
149+ endpoint to fetch. Other arguments are provided by args/kwargs.
150+
151+ Returns:
152+ The return value of func.
153+ """
154+ for i , prefix in enumerate (prefixes , start = 1 ):
155+ uri = f"{ service .url } { prefix } { path } "
156+ try :
157+ return await func (uri , * args , ** kwargs )
158+ except HttpResponseException as e :
159+ # If an error is received that is due to an unrecognised path,
160+ # fallback to next path (if one exists). Otherwise, consider it
161+ # a legitimate error and raise.
162+ if i < len (prefixes ) and is_unknown_endpoint (e ):
163+ continue
164+ raise
165+ except Exception :
166+ # Unexpected exceptions get sent to the caller.
167+ raise
168+
169+ # The function should always exit via the return or raise above this.
170+ raise RuntimeError ("Unexpected fallback behaviour. This should never be seen." )
171+
124172 async def query_user (self , service : "ApplicationService" , user_id : str ) -> bool :
125173 if service .url is None :
126174 return False
127175
128176 # This is required by the configuration.
129177 assert service .hs_token is not None
130178
131- uri = service .url + ("/users/%s" % urllib .parse .quote (user_id ))
132179 try :
133- response = await self .get_json (
134- uri ,
180+ response = await self ._send_with_fallbacks (
181+ service ,
182+ [APP_SERVICE_PREFIX , "" ],
183+ f"/users/{ urllib .parse .quote (user_id )} " ,
184+ self .get_json ,
135185 {"access_token" : service .hs_token },
136186 headers = {"Authorization" : [f"Bearer { service .hs_token } " ]},
137187 )
@@ -140,9 +190,9 @@ async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
140190 except CodeMessageException as e :
141191 if e .code == 404 :
142192 return False
143- logger .warning ("query_user to %s received %s" , uri , e .code )
193+ logger .warning ("query_user to %s received %s" , service . url , e .code )
144194 except Exception as ex :
145- logger .warning ("query_user to %s threw exception %s" , uri , ex )
195+ logger .warning ("query_user to %s threw exception %s" , service . url , ex )
146196 return False
147197
148198 async def query_alias (self , service : "ApplicationService" , alias : str ) -> bool :
@@ -152,21 +202,23 @@ async def query_alias(self, service: "ApplicationService", alias: str) -> bool:
152202 # This is required by the configuration.
153203 assert service .hs_token is not None
154204
155- uri = service .url + ("/rooms/%s" % urllib .parse .quote (alias ))
156205 try :
157- response = await self .get_json (
158- uri ,
206+ response = await self ._send_with_fallbacks (
207+ service ,
208+ [APP_SERVICE_PREFIX , "" ],
209+ f"/rooms/{ urllib .parse .quote (alias )} " ,
210+ self .get_json ,
159211 {"access_token" : service .hs_token },
160212 headers = {"Authorization" : [f"Bearer { service .hs_token } " ]},
161213 )
162214 if response is not None : # just an empty json object
163215 return True
164216 except CodeMessageException as e :
165- logger .warning ("query_alias to %s received %s" , uri , e .code )
217+ logger .warning ("query_alias to %s received %s" , service . url , e .code )
166218 if e .code == 404 :
167219 return False
168220 except Exception as ex :
169- logger .warning ("query_alias to %s threw exception %s" , uri , ex )
221+ logger .warning ("query_alias to %s threw exception %s" , service . url , ex )
170222 return False
171223
172224 async def query_3pe (
@@ -188,25 +240,24 @@ async def query_3pe(
188240 # This is required by the configuration.
189241 assert service .hs_token is not None
190242
191- uri = "%s%s/thirdparty/%s/%s" % (
192- service .url ,
193- APP_SERVICE_PREFIX ,
194- kind ,
195- urllib .parse .quote (protocol ),
196- )
197243 try :
198244 args : Mapping [Any , Any ] = {
199245 ** fields ,
200246 b"access_token" : service .hs_token ,
201247 }
202- response = await self .get_json (
203- uri ,
248+ response = await self ._send_with_fallbacks (
249+ service ,
250+ [APP_SERVICE_PREFIX , APP_SERVICE_UNSTABLE_PREFIX ],
251+ f"/thirdparty/{ kind } /{ urllib .parse .quote (protocol )} " ,
252+ self .get_json ,
204253 args = args ,
205254 headers = {"Authorization" : [f"Bearer { service .hs_token } " ]},
206255 )
207256 if not isinstance (response , list ):
208257 logger .warning (
209- "query_3pe to %s returned an invalid response %r" , uri , response
258+ "query_3pe to %s returned an invalid response %r" ,
259+ service .url ,
260+ response ,
210261 )
211262 return []
212263
@@ -216,12 +267,12 @@ async def query_3pe(
216267 ret .append (r )
217268 else :
218269 logger .warning (
219- "query_3pe to %s returned an invalid result %r" , uri , r
270+ "query_3pe to %s returned an invalid result %r" , service . url , r
220271 )
221272
222273 return ret
223274 except Exception as ex :
224- logger .warning ("query_3pe to %s threw exception %s" , uri , ex )
275+ logger .warning ("query_3pe to %s threw exception %s" , service . url , ex )
225276 return []
226277
227278 async def get_3pe_protocol (
@@ -233,21 +284,20 @@ async def get_3pe_protocol(
233284 async def _get () -> Optional [JsonDict ]:
234285 # This is required by the configuration.
235286 assert service .hs_token is not None
236- uri = "%s%s/thirdparty/protocol/%s" % (
237- service .url ,
238- APP_SERVICE_PREFIX ,
239- urllib .parse .quote (protocol ),
240- )
241287 try :
242- info = await self .get_json (
243- uri ,
288+ info = await self ._send_with_fallbacks (
289+ service ,
290+ [APP_SERVICE_PREFIX , APP_SERVICE_UNSTABLE_PREFIX ],
291+ f"/thirdparty/protocol/{ urllib .parse .quote (protocol )} " ,
292+ self .get_json ,
244293 {"access_token" : service .hs_token },
245294 headers = {"Authorization" : [f"Bearer { service .hs_token } " ]},
246295 )
247296
248297 if not _is_valid_3pe_metadata (info ):
249298 logger .warning (
250- "query_3pe_protocol to %s did not return a valid result" , uri
299+ "query_3pe_protocol to %s did not return a valid result" ,
300+ service .url ,
251301 )
252302 return None
253303
@@ -260,7 +310,9 @@ async def _get() -> Optional[JsonDict]:
260310
261311 return info
262312 except Exception as ex :
263- logger .warning ("query_3pe_protocol to %s threw exception %s" , uri , ex )
313+ logger .warning (
314+ "query_3pe_protocol to %s threw exception %s" , service .url , ex
315+ )
264316 return None
265317
266318 key = (service .id , protocol )
@@ -274,7 +326,7 @@ async def ping(self, service: "ApplicationService", txn_id: Optional[str]) -> No
274326 assert service .hs_token is not None
275327
276328 await self .post_json_get_json (
277- uri = service .url + "/_matrix/app/unstable /fi.mau.msc2659/ping" ,
329+ uri = f" { service .url } { APP_SERVICE_UNSTABLE_PREFIX } /fi.mau.msc2659/ping" ,
278330 post_json = {"transaction_id" : txn_id },
279331 headers = {"Authorization" : [f"Bearer { service .hs_token } " ]},
280332 )
@@ -318,8 +370,6 @@ async def push_bulk(
318370 )
319371 txn_id = 0
320372
321- uri = service .url + ("/transactions/%s" % urllib .parse .quote (str (txn_id )))
322-
323373 # Never send ephemeral events to appservices that do not support it
324374 body : JsonDict = {"events" : serialized_events }
325375 if service .supports_ephemeral :
@@ -351,16 +401,19 @@ async def push_bulk(
351401 }
352402
353403 try :
354- await self .put_json (
355- uri = uri ,
404+ await self ._send_with_fallbacks (
405+ service ,
406+ [APP_SERVICE_PREFIX , "" ],
407+ f"/transactions/{ urllib .parse .quote (str (txn_id ))} " ,
408+ self .put_json ,
356409 json_body = body ,
357410 args = {"access_token" : service .hs_token },
358411 headers = {"Authorization" : [f"Bearer { service .hs_token } " ]},
359412 )
360413 if logger .isEnabledFor (logging .DEBUG ):
361414 logger .debug (
362415 "push_bulk to %s succeeded! events=%s" ,
363- uri ,
416+ service . url ,
364417 [event .get ("event_id" ) for event in events ],
365418 )
366419 sent_transactions_counter .labels (service .id ).inc ()
@@ -371,15 +424,15 @@ async def push_bulk(
371424 except CodeMessageException as e :
372425 logger .warning (
373426 "push_bulk to %s received code=%s msg=%s" ,
374- uri ,
427+ service . url ,
375428 e .code ,
376429 e .msg ,
377430 exc_info = logger .isEnabledFor (logging .DEBUG ),
378431 )
379432 except Exception as ex :
380433 logger .warning (
381434 "push_bulk to %s threw exception(%s) %s args=%s" ,
382- uri ,
435+ service . url ,
383436 type (ex ).__name__ ,
384437 ex ,
385438 ex .args ,
0 commit comments