Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Batch up replication requests to request the resyncing of remote user…
Browse files Browse the repository at this point in the history
…s's devices. (#14716)
  • Loading branch information
reivilibre authored Jan 10, 2023
1 parent 3479599 commit ba4ea7d
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 79 deletions.
1 change: 1 addition & 0 deletions changelog.d/14716.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Batch up replication requests to request the resyncing of remote users's devices.
124 changes: 98 additions & 26 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -33,6 +34,7 @@
Codes,
FederationDeniedError,
HttpResponseException,
InvalidAPICallError,
RequestSendFailed,
SynapseError,
)
Expand All @@ -45,6 +47,7 @@
JsonDict,
StreamKeyType,
StreamToken,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
Expand Down Expand Up @@ -893,12 +896,47 @@ class DeviceListWorkerUpdater:

def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
ReplicationUserDevicesResyncRestServlet,
)

self._user_device_resync_client = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)

async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
Returns:
Dict from User ID to the same Dict as `user_device_resync`.
"""
# mark_failed_as_stale is not sent. Ensure this doesn't break expectations.
assert mark_failed_as_stale

if not user_ids:
# Shortcut empty requests
return {}

try:
return await self._multi_user_device_resync_client(user_ids=user_ids)
except SynapseError as err:
if not (
err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
):
raise

# Fall back to single requests
result: Dict[str, Optional[JsonDict]] = {}
for user_id in user_ids:
result[user_id] = await self._user_device_resync_client(user_id=user_id)
return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
Expand All @@ -913,8 +951,10 @@ async def user_device_resync(
A dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
None when we weren't able to fetch the device info for some reason,
e.g. due to a connection problem.
"""
return await self._user_device_resync_client(user_id=user_id)
return (await self.multi_user_device_resync([user_id]))[user_id]


class DeviceListUpdater(DeviceListWorkerUpdater):
Expand Down Expand Up @@ -1160,19 +1200,66 @@ async def _maybe_retry_device_resync(self) -> None:
# Allow future calls to retry resyncinc out of sync device lists.
self._resync_retry_in_progress = False

async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
Returns:
Dict from User ID to the same Dict as `user_device_resync`.
"""
if not user_ids:
return {}

origins = {UserID.from_string(user_id).domain for user_id in user_ids}

if len(origins) != 1:
raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")

result = {}
failed = set()
# TODO(Perf): Actually batch these up
for user_id in user_ids:
user_result, user_failed = await self._user_device_resync_returning_failed(
user_id
)
result[user_id] = user_result
if user_failed:
failed.add(user_id)

if mark_failed_as_stale:
await self.store.mark_remote_users_device_caches_as_stale(failed)

return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
result, failed = await self._user_device_resync_returning_failed(user_id)

if failed and mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_users_device_caches_as_stale((user_id,))

return result

async def _user_device_resync_returning_failed(
self, user_id: str
) -> Tuple[Optional[JsonDict], bool]:
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id: The user's id whose device_list will be updated.
mark_failed_as_stale: Whether to mark the user's device list as stale
if the attempt to resync failed.
Returns:
A dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
- A dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
None when we weren't able to fetch the device info for some reason,
e.g. due to a connection problem.
- True iff the resync failed and the device list should be marked as stale.
"""
logger.debug("Attempting to resync the device list for %s", user_id)
log_kv({"message": "Doing resync to update device list."})
Expand All @@ -1181,49 +1268,34 @@ async def user_device_resync(
try:
result = await self.federation.query_user_devices(origin, user_id)
except NotRetryingDestination:
if mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_user_device_cache_as_stale(user_id)

return None
return None, True
except (RequestSendFailed, HttpResponseException) as e:
logger.warning(
"Failed to handle device list update for %s: %s",
user_id,
e,
)

if mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_user_device_cache_as_stale(user_id)

# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
# next time we get a device list update for this user_id.
# This makes it more likely that the device lists will
# eventually become consistent.
return None
return None, True
except FederationDeniedError as e:
set_tag("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return None
return None, False
except Exception as e:
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)

if mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_user_device_cache_as_stale(user_id)

return None
return None, True
log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
Expand Down Expand Up @@ -1305,7 +1377,7 @@ async def user_device_resync(
# point.
self._seen_updates[user_id] = {stream_id}

return result
return result, False

async def process_cross_signing_key_update(
self,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async def _check_for_unknown_devices(
sender_user_id,
unknown_devices,
)
await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))

# Immediately attempt a resync in the background
run_in_background(self._user_device_resync, user_id=sender_user_id)
Expand Down
93 changes: 55 additions & 38 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, delay_cancellation
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -238,24 +238,28 @@ async def query_devices(
# Now fetch any devices that we don't have in our cache
# TODO It might make sense to propagate cancellations into the
# deferreds which are querying remote homeservers.
await make_deferred_yieldable(
delay_cancellation(
defer.gatherResults(
[
run_in_background(
self._query_devices_for_destination,
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)
for destination, queries in remote_queries_not_in_cache.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
logger.debug(
"%d destinations to query devices for", len(remote_queries_not_in_cache)
)

async def _query(
destination_queries: Tuple[str, Dict[str, Iterable[str]]]
) -> None:
destination, queries = destination_queries
return await self._query_devices_for_destination(
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)

await concurrently_execute(
_query,
remote_queries_not_in_cache.items(),
10,
delay_cancellation=True,
)

ret = {"device_keys": results, "failures": failures}
Expand Down Expand Up @@ -300,28 +304,41 @@ async def _query_devices_for_destination(
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
for (user_id, device_list) in destination_query.items():
if user_id in user_ids_updated:
continue

if device_list:
continue
# Perform a user device resync for each user only once and only as long as:
# - they have an empty device_list
# - they are in some rooms that this server can see
users_to_resync_devices = {
user_id
for (user_id, device_list) in destination_query.items()
if (not device_list) and (await self.store.get_rooms_for_user(user_id))
}

room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
logger.debug(
"%d users to resync devices for from destination %s",
len(users_to_resync_devices),
destination,
)

# We've decided we're sharing a room with this user and should
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
resync_results = (
await self.device_handler.device_list_updater.user_device_resync(
user_id
)
try:
user_resync_results = (
await self.device_handler.device_list_updater.multi_user_device_resync(
list(users_to_resync_devices)
)
)
for user_id in users_to_resync_devices:
resync_results = user_resync_results[user_id]

if resync_results is None:
raise ValueError("Device resync failed")
# TODO: It's weird that we'll store a failure against a
# destination, yet continue processing users from that
# destination.
# We might want to consider changing this, but for now
# I'm leaving it as I found it.
failures[destination] = _exception_to_failure(
ValueError(f"Device resync failed for {user_id!r}")
)
continue

# Add the device keys to the results.
user_devices = resync_results["devices"]
Expand All @@ -339,8 +356,8 @@ async def _query_devices_for_destination(

if self_signing_key:
cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
except Exception as e:
failures[destination] = _exception_to_failure(e)
except Exception as e:
failures[destination] = _exception_to_failure(e)

if len(destination_query) == len(user_ids_updated):
# We've updated all the users in the query and we do not need to
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,7 @@ async def _resync_device(self, sender: str) -> None:
"""

try:
await self._store.mark_remote_user_device_cache_as_stale(sender)
await self._store.mark_remote_users_device_caches_as_stale((sender,))

# Immediately attempt a resync in the background
if self._config.worker.worker_app:
Expand Down
Loading

0 comments on commit ba4ea7d

Please sign in to comment.