From 712144e7688333d24e2d6a650d6d9e6676de90fc Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Dec 2022 17:56:57 +0000 Subject: [PATCH] Batch up the DB writes when marking failures --- synapse/handlers/device.py | 13 ++++++++-- synapse/handlers/devicemessage.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/storage/databases/main/devices.py | 30 +++++++++++++++++------ synapse/types/__init__.py | 4 +++ 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0a74e4d266bd..68a0c8ccb490 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1212,9 +1212,18 @@ async def multi_user_device_resync( raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}") result = {} + failed = set() # TODO(Perf): Actually batch these up for user_id in user_ids: - result[user_id] = await self.user_device_resync(user_id) + user_result, user_failed = await self._user_device_resync_returning_failed( + user_id + ) + result[user_id] = user_result + if user_failed: + failed.add(user_id) + + if mark_failed_as_stale: + await self.store.mark_remote_users_device_caches_as_stale(failed) return result @@ -1226,7 +1235,7 @@ async def user_device_resync( if failed and mark_failed_as_stale: # Mark the remote user's device list as stale so we know we need to retry # it later. - await self.store.mark_remote_user_device_cache_as_stale(user_id) + await self.store.mark_remote_users_device_caches_as_stale((user_id,)) return result diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 75e89850f5bc..00c403db4925 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -195,7 +195,7 @@ async def _check_for_unknown_devices( sender_user_id, unknown_devices, ) - await self.store.mark_remote_user_device_cache_as_stale(sender_user_id) + await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,)) # Immediately attempt a resync in the background run_in_background(self._user_device_resync, user_id=sender_user_id) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 31df7f55cc97..6df000faafed 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1423,7 +1423,7 @@ async def _resync_device(self, sender: str) -> None: """ try: - await self._store.mark_remote_user_device_cache_as_stale(sender) + await self._store.mark_remote_users_device_caches_as_stale((sender,)) # Immediately attempt a resync in the background if self._config.worker.worker_app: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index a5bb4d404e20..a921332cb042 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -54,7 +54,7 @@ AbstractStreamIdTracker, StreamIdGenerator, ) -from synapse.types import JsonDict, get_verify_key_from_cross_signing_key +from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache @@ -1062,16 +1062,30 @@ async def get_user_ids_requiring_device_list_resync( return {row["user_id"] for row in rows} - async def mark_remote_user_device_cache_as_stale(self, user_id: str) -> None: + async def mark_remote_users_device_caches_as_stale( + self, user_ids: StrCollection + ) -> None: """Records that the server has reason to believe the cache of the devices for the remote users is out of date. """ - await self.db_pool.simple_upsert( - table="device_lists_remote_resync", - keyvalues={"user_id": user_id}, - values={}, - insertion_values={"added_ts": self._clock.time_msec()}, - desc="mark_remote_user_device_cache_as_stale", + + def _mark_remote_users_device_caches_as_stale_txn( + txn: LoggingTransaction, + ) -> None: + # TODO add insertion_values support to simple_upsert_many and use + # that! + for user_id in user_ids: + self.db_pool.simple_upsert_txn( + txn, + table="device_lists_remote_resync", + keyvalues={"user_id": user_id}, + values={}, + insertion_values={"added_ts": self._clock.time_msec()}, + ) + + await self.db_pool.runInteraction( + "mark_remote_users_device_caches_as_stale", + _mark_remote_users_device_caches_as_stale_txn, ) async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None: diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index f2d436ddc38c..0c725eb9677d 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -77,6 +77,10 @@ # A JSON-serialisable object. JsonSerializable = object +# Collection[str] that does not include str itself; str being a Sequence[str] +# is very misleading and results in bugs. +StrCollection = Union[Tuple[str, ...], List[str], Set[str]] + # Note that this seems to require inheriting *directly* from Interface in order # for mypy-zope to realize it is an interface.