Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch up replication requests to request the resyncing of remote users's devices. #14716

Merged
merged 15 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 37 additions & 27 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, delay_cancellation
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -238,24 +238,28 @@ async def query_devices(
# Now fetch any devices that we don't have in our cache
# TODO It might make sense to propagate cancellations into the
# deferreds which are querying remote homeservers.
await make_deferred_yieldable(
delay_cancellation(
defer.gatherResults(
[
run_in_background(
self._query_devices_for_destination,
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)
for destination, queries in remote_queries_not_in_cache.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
logger.debug(
"%d destinations to query devices for", len(remote_queries_not_in_cache)
)

async def _query(
destination_queries: Tuple[str, Dict[str, Iterable[str]]]
) -> None:
destination, queries = destination_queries
return await self._query_devices_for_destination(
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)

await concurrently_execute(
_query,
remote_queries_not_in_cache.items(),
10,
delay_cancellation=True,
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

ret = {"device_keys": results, "failures": failures}
Expand Down Expand Up @@ -300,17 +304,23 @@ async def _query_devices_for_destination(
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
for (user_id, device_list) in destination_query.items():
if user_id in user_ids_updated:
continue

if device_list:
continue
# Perform a user device resync for each user only once and only as long as:
# - they have an empty device_list
# - they are in some rooms that this server can see
users_to_resync_devices = {
user_id
for (user_id, device_list) in destination_query.items()
if (not device_list) and (await self.store.get_rooms_for_user(user_id))
}

room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
logger.debug(
"%d users to resync devices for from destination %s",
len(users_to_resync_devices),
destination,
)

for user_id in users_to_resync_devices:
# We've decided we're sharing a room with this user and should
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
Expand Down
55 changes: 51 additions & 4 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ def __repr__(self) -> str:


async def concurrently_execute(
func: Callable[[T], Any], args: Iterable[T], limit: int
func: Callable[[T], Any],
args: Iterable[T],
limit: int,
delay_cancellation: bool = False,
) -> None:
"""Executes the function with each argument concurrently while limiting
the number of concurrent executions.
Expand All @@ -215,6 +218,8 @@ async def concurrently_execute(
args: List of arguments to pass to func, each invocation of func
gets a single argument.
limit: Maximum number of conccurent executions.
delay_cancellation: Whether to delay cancellation until after the invocations
have finished.

Returns:
None, when all function invocations have finished. The return values
Expand All @@ -233,9 +238,16 @@ async def _concurrently_execute_inner(value: T) -> None:
# We use `itertools.islice` to handle the case where the number of args is
# less than the limit, avoiding needlessly spawning unnecessary background
# tasks.
await yieldable_gather_results(
_concurrently_execute_inner, (value for value in itertools.islice(it, limit))
)
if delay_cancellation:
await yieldable_gather_results_delaying_cancellation(
_concurrently_execute_inner,
(value for value in itertools.islice(it, limit)),
)
else:
await yieldable_gather_results(
_concurrently_execute_inner,
(value for value in itertools.islice(it, limit)),
)


P = ParamSpec("P")
Expand Down Expand Up @@ -292,6 +304,41 @@ async def yieldable_gather_results(
raise dfe.subFailure.value from None


async def yieldable_gather_results_delaying_cancellation(
func: Callable[Concatenate[T, P], Awaitable[R]],
iter: Iterable[T],
*args: P.args,
**kwargs: P.kwargs,
) -> List[R]:
"""Executes the function with each argument concurrently.
Cancellation is delayed until after all the results have been gathered.

See `yieldable_gather_results`.

Args:
func: Function to execute that returns a Deferred
iter: An iterable that yields items that get passed as the first
argument to the function
*args: Arguments to be passed to each call to func
**kwargs: Keyword arguments to be passed to each call to func

Returns
A list containing the results of the function
"""
try:
return await make_deferred_yieldable(
delay_cancellation(
defer.gatherResults(
[run_in_background(func, item, *args, **kwargs) for item in iter], # type: ignore[arg-type]
consumeErrors=True,
)
)
)
except defer.FirstError as dfe:
assert isinstance(dfe.subFailure.value, BaseException)
raise dfe.subFailure.value from None


T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
Expand Down