diff --git a/changelog.d/17461.misc b/changelog.d/17461.misc new file mode 100644 index 0000000000..80f7144baa --- /dev/null +++ b/changelog.d/17461.misc @@ -0,0 +1 @@ +Speed up fetching room keys from backup. diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 99f9f6e64a..f397911f28 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -34,7 +34,7 @@ from synapse.logging.opentracing import log_kv, trace from synapse.storage.databases.main.e2e_room_keys import RoomKey from synapse.types import JsonDict -from synapse.util.async_helpers import Linearizer +from synapse.util.async_helpers import ReadWriteLock if TYPE_CHECKING: from synapse.server import HomeServer @@ -58,7 +58,7 @@ def __init__(self, hs: "HomeServer"): # clients belonging to a user will receive and try to upload a new session at # roughly the same time. Also used to lock out uploads when the key is being # changed. - self._upload_linearizer = Linearizer("upload_room_keys_lock") + self._upload_lock = ReadWriteLock() @trace async def get_room_keys( @@ -89,7 +89,7 @@ async def get_room_keys( # we deliberately take the lock to get keys so that changing the version # works atomically - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.read(user_id): # make sure the backup version exists try: await self.store.get_e2e_room_keys_version_info(user_id, version) @@ -132,7 +132,7 @@ async def delete_room_keys( """ # lock for consistency with uploading - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.write(user_id): # make sure the backup version exists try: version_info = await self.store.get_e2e_room_keys_version_info( @@ -193,7 +193,7 @@ async def upload_room_keys( # TODO: Validate the JSON to make sure it has the right keys. # XXX: perhaps we should use a finer grained lock here? - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.write(user_id): # Check that the version we're trying to upload is the current version try: version_info = await self.store.get_e2e_room_keys_version_info(user_id) @@ -355,7 +355,7 @@ async def create_version(self, user_id: str, version_info: JsonDict) -> str: # TODO: Validate the JSON to make sure it has the right keys. # lock everyone out until we've switched version - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.write(user_id): new_version = await self.store.create_e2e_room_keys_version( user_id, version_info ) @@ -382,7 +382,7 @@ async def get_version_info( } """ - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.read(user_id): try: res = await self.store.get_e2e_room_keys_version_info(user_id, version) except StoreError as e: @@ -407,7 +407,7 @@ async def delete_version(self, user_id: str, version: Optional[str] = None) -> N NotFoundError: if this backup version doesn't exist """ - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.write(user_id): try: await self.store.delete_e2e_room_keys_version(user_id, version) except StoreError as e: @@ -437,7 +437,7 @@ async def update_version( raise SynapseError( 400, "Version in body does not match", Codes.INVALID_PARAM ) - async with self._upload_linearizer.queue(user_id): + async with self._upload_lock.write(user_id): try: old_info = await self.store.get_e2e_room_keys_version_info( user_id, version