Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch up replication requests to request the resyncing of remote users's devices. #14716

Merged
merged 15 commits into from
Jan 10, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -33,6 +34,7 @@
Codes,
FederationDeniedError,
HttpResponseException,
InvalidAPICallError,
RequestSendFailed,
SynapseError,
)
Expand All @@ -45,6 +47,7 @@
JsonDict,
StreamKeyType,
StreamToken,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
Expand Down Expand Up @@ -893,12 +896,41 @@ class DeviceListWorkerUpdater:

def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
ReplicationUserDevicesResyncRestServlet,
)

self._user_device_resync_client = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)

async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.

Returns:
Dict from User ID to the same Dict as `user_device_resync`.
"""
# TODO(BUG): mark_failed_as_stale is not sent.
try:
return await self._multi_user_device_resync_client(user_ids=user_ids)
except SynapseError as err:
if not (
err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
):
raise

# Fall back to single requests
result: Dict[str, Optional[JsonDict]] = {}
for user_id in user_ids:
result[user_id] = await self._user_device_resync_client(user_id=user_id)
return result
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def user_device_resync(
clokep marked this conversation as resolved.
Show resolved Hide resolved
self, user_id: str, mark_failed_as_stale: bool = True
Expand All @@ -914,6 +946,7 @@ async def user_device_resync(
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
# TODO(BUG): mark_failed_as_stale is not sent.
return await self._user_device_resync_client(user_id=user_id)


Expand Down Expand Up @@ -1160,6 +1193,31 @@ async def _maybe_retry_device_resync(self) -> None:
# Allow future calls to retry resyncinc out of sync device lists.
self._resync_retry_in_progress = False

async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.

Returns:
Dict from User ID to the same Dict as `user_device_resync`.
"""
if not user_ids:
return {}

origins = {UserID.from_string(user_id).domain for user_id in user_ids}

if len(origins) != 1:
raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")

result = {}
# TODO(Perf): Actually batch these up
for user_id in user_ids:
result[user_id] = await self.user_device_resync(user_id)

return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
Expand Down