Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch up replication requests to request the resyncing of remote users's devices. #14716

Merged
merged 15 commits into from
Jan 10, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, delay_cancellation
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -241,24 +241,25 @@ async def query_devices(
logger.debug(
"%d destinations to query devices for", len(remote_queries_not_in_cache)
)
await make_deferred_yieldable(
delay_cancellation(
defer.gatherResults(
[
run_in_background(
self._query_devices_for_destination,
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)
for destination, queries in remote_queries_not_in_cache.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)

async def _query(
destination_queries: Tuple[str, Dict[str, Iterable[str]]]
) -> None:
destination, queries = destination_queries
return await self._query_devices_for_destination(
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)

await concurrently_execute(
_query,
remote_queries_not_in_cache.items(),
10,
delay_cancellation=True,
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

ret = {"device_keys": results, "failures": failures}
Expand Down