diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5c0607390137..224b81513132 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -906,11 +906,11 @@ class DeviceListWorkerUpdater: def __init__(self, hs: "HomeServer"): from synapse.replication.http.devices import ( ReplicationMultiUserDevicesResyncRestServlet, - ReplicationUserDevicesResyncRestServlet, + ReplicationMultiUserDevicesResyncRestServlet, ) - self._user_device_resync_client = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) + self._multi_user_device_resync_client = ( + ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) self._multi_user_device_resync_client = ( ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) @@ -944,27 +944,10 @@ async def multi_user_device_resync( # Fall back to single requests result: Dict[str, Optional[JsonDict]] = {} for user_id in user_ids: - result[user_id] = await self._user_device_resync_client(user_id=user_id) + result[user_id] = await self._multi_user_device_resync_client(user_id=user_id) return result - async def user_device_resync( - self, user_id: str, mark_failed_as_stale: bool = True - ) -> Optional[JsonDict]: - """Fetches all devices for a user and updates the device cache with them. - - Args: - user_id: The user's id whose device_list will be updated. - mark_failed_as_stale: Whether to mark the user's device list as stale - if the attempt to resync failed. - Returns: - A dict with device info as under the "devices" in the result of this - request: - https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid - None when we weren't able to fetch the device info for some reason, - e.g. due to a connection problem. - """ - return (await self.multi_user_device_resync([user_id]))[user_id] - + class DeviceListUpdater(DeviceListWorkerUpdater): "Handles incoming device list updates from federation and updates the DB" @@ -1116,7 +1099,7 @@ async def _handle_device_updates(self, user_id: str) -> None: ) if resync: - await self.user_device_resync(user_id) + await self.multi_user_device_resync(user_id) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -1183,7 +1166,7 @@ async def _maybe_retry_device_resync(self) -> None: for user_id in need_resync: try: # Try to resync the current user's devices list. - result = await self.user_device_resync( + result = await self.multi_user_device_resync( user_id=user_id, mark_failed_as_stale=False, ) @@ -1245,17 +1228,6 @@ async def multi_user_device_resync( return result - async def user_device_resync( - self, user_id: str, mark_failed_as_stale: bool = True - ) -> Optional[JsonDict]: - result, failed = await self._user_device_resync_returning_failed(user_id) - - if failed and mark_failed_as_stale: - # Mark the remote user's device list as stale so we know we need to retry - # it later. - await self.store.mark_remote_users_device_caches_as_stale((user_id,)) - - return result async def _user_device_resync_returning_failed( self, user_id: str diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 00c403db4925..c249d3c2b91d 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -25,7 +25,7 @@ log_kv, set_tag, ) -from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet +from synapse.replication.http.devices import ReplicationMultiUserDevicesResyncRestServlet from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -71,12 +71,12 @@ def __init__(self, hs: "HomeServer"): # sync. We do all device list resyncing on the master instance, so if # we're on a worker we hit the device resync replication API. if hs.config.worker.worker_app is None: - self._user_device_resync = ( - hs.get_device_handler().device_list_updater.user_device_resync + self._multi_user_device_resync = ( + hs.get_device_handler().device_list_updater.multi_user_device_resync ) else: - self._user_device_resync = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) + self._multi_user_device_resync = ( + ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) # a rate limiter for room key requests. The keys are @@ -198,7 +198,7 @@ async def _check_for_unknown_devices( await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,)) # Immediately attempt a resync in the background - run_in_background(self._user_device_resync, user_id=sender_user_id) + run_in_background(self._multi_user_device_resync, user_id=sender_user_id) async def send_device_message( self, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e037acbca2bf..f9f17dea3e5b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -69,7 +69,7 @@ trace, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet +from synapse.replication.http.devices import ReplicationMultiUserDevicesResyncRestServlet from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) @@ -166,8 +166,8 @@ def __init__(self, hs: "HomeServer"): self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) if hs.config.worker.worker_app: - self._user_device_resync = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) + self._multi_user_device_resync = ( + ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) else: self._device_list_updater = hs.get_device_handler().device_list_updater @@ -1428,9 +1428,9 @@ async def _resync_device(self, sender: str) -> None: # Immediately attempt a resync in the background if self._config.worker.worker_app: - await self._user_device_resync(user_id=sender) + await self._multi_user_device_resync(user_id=sender) else: - await self._device_list_updater.user_device_resync(sender) + await self._device_list_updater.multi_user_device_resync(sender) except Exception: logger.exception("Failed to resync device for %s", sender) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index ecea6fc915c7..c9735843ea5e 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -28,131 +28,6 @@ logger = logging.getLogger(__name__) -class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): - """Ask master to resync the device list for a user by contacting their - server. - - This must happen on master so that the results can be correctly cached in - the database and streamed to workers. - - Request format: - - POST /_synapse/replication/user_device_resync/:user_id - - {} - - Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` - response, e.g.: - - { - "user_id": "@alice:example.org", - "devices": [ - { - "device_id": "JLAFKJWSCS", - "keys": { ... }, - "device_display_name": "Alice's Mobile Phone" - } - ] - } - """ - - NAME = "user_device_resync" - PATH_ARGS = ("user_id",) - CACHE = False - - def __init__(self, hs: "HomeServer"): - super().__init__(hs) - - from synapse.handlers.device import DeviceHandler - - handler = hs.get_device_handler() - assert isinstance(handler, DeviceHandler) - self.device_list_updater = handler.device_list_updater - - self.store = hs.get_datastores().main - self.clock = hs.get_clock() - - @staticmethod - async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] - return {} - - async def _handle_request( # type: ignore[override] - self, request: Request, content: JsonDict, user_id: str - ) -> Tuple[int, Optional[JsonDict]]: - user_devices = await self.device_list_updater.user_device_resync(user_id) - - return 200, user_devices - - -class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): - """Ask master to resync the device list for multiple users from the same - remote server by contacting their server. - - This must happen on master so that the results can be correctly cached in - the database and streamed to workers. - - Request format: - - POST /_synapse/replication/multi_user_device_resync - - { - "user_ids": ["@alice:example.org", "@bob:example.org", ...] - } - - Response is roughly equivalent to ` /_matrix/federation/v1/user/devices/:user_id` - response, but there is a map from user ID to response, e.g.: - - { - "@alice:example.org": { - "devices": [ - { - "device_id": "JLAFKJWSCS", - "keys": { ... }, - "device_display_name": "Alice's Mobile Phone" - } - ] - }, - ... - } - """ - - NAME = "multi_user_device_resync" - PATH_ARGS = () - CACHE = False - - def __init__(self, hs: "HomeServer"): - super().__init__(hs) - - from synapse.handlers.device import DeviceHandler - - handler = hs.get_device_handler() - assert isinstance(handler, DeviceHandler) - self.device_list_updater = handler.device_list_updater - - self.store = hs.get_datastores().main - self.clock = hs.get_clock() - - @staticmethod - async def _serialize_payload(user_ids: List[str]) -> JsonDict: # type: ignore[override] - return {"user_ids": user_ids} - - async def _handle_request( # type: ignore[override] - self, request: Request, content: JsonDict - ) -> Tuple[int, Dict[str, Optional[JsonDict]]]: - user_ids: List[str] = content["user_ids"] - - logger.info("Resync for %r", user_ids) - span = active_span() - if span: - span.set_tag("user_ids", f"{user_ids!r}") - - multi_user_devices = await self.device_list_updater.multi_user_device_resync( - user_ids - ) - - return 200, multi_user_devices - - class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): """Ask master to upload keys for the user and send them out over federation to update other servers. @@ -217,6 +92,5 @@ async def _handle_request( # type: ignore[override] def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - ReplicationUserDevicesResyncRestServlet(hs).register(http_server) ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server) ReplicationUploadKeysForUserRestServlet(hs).register(http_server) diff --git a/tests/test_federation.py b/tests/test_federation.py index 80e5c590d836..d970a180ad09 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -254,7 +254,7 @@ def test_cross_signing_keys_retry(self): # Resync the device list. device_handler = self.homeserver.get_device_handler() self.get_success( - device_handler.device_list_updater.user_device_resync(remote_user_id), + device_handler.device_list_updater.multi_user_device_resync(remote_user_id), ) # Retrieve the cross-signing keys for this user.