From da5339dc54933a3360b9fc8ad59377309b13d94b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Aug 2024 15:29:24 +0100 Subject: [PATCH 01/13] Migrate to per-connection state class --- synapse/handlers/sliding_sync.py | 102 +++++++++++++++++++------------ 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 99510254f3..2a104f83fd 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -571,21 +571,19 @@ async def current_sync_for_user( # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() - if from_token: - # Check that we recognize the connection position, if not tell the - # clients that they need to start again. - # - # If we don't do this and the client asks for the full range of - # rooms, we end up sending down all rooms and their state from - # scratch (which can be very slow). By expiring the connection we - # allow the client a chance to do an initial request with a smaller - # range of rooms to get them some results sooner but will end up - # taking the same amount of time (more with round-trips and - # re-processing) in the end to get everything again. - if not await self.connection_store.is_valid_token( - sync_config, from_token.connection_position - ): - raise SlidingSyncUnknownPosition() + # Get the per-connection state (if any). + # + # Raises an exception if there is a `connection_position` that we don't + # recognize. If we don't do this and the client asks for the full range + # of rooms, we end up sending down all rooms and their state from + # scratch (which can be very slow). By expiring the connection we allow + # the client a chance to do an initial request with a smaller range of + # rooms to get them some results sooner but will end up taking the same + # amount of time (more with round-trips and re-processing) in the end to + # get everything again. + per_connection_state = await self.connection_store.get_per_connection_state( + sync_config, from_token + ) await self.connection_store.mark_token_seen( sync_config=sync_config, @@ -781,11 +779,7 @@ async def current_sync_for_user( # we haven't sent the room down, or we have but there are missing # updates). for room_id in relevant_room_map: - status = await self.connection_store.have_sent_room( - sync_config, - from_token.connection_position, - room_id, - ) + status = per_connection_state.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know # about it regardless of any updates. @@ -821,6 +815,7 @@ async def current_sync_for_user( async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, + per_connection_state=per_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -885,6 +880,7 @@ async def handle_room(room_id: str) -> None: connection_position = await self.connection_store.record_rooms( sync_config=sync_config, from_token=from_token, + per_connection_state=per_connection_state, sent_room_ids=relevant_rooms_to_send_map.keys(), unsent_room_ids=unsent_room_ids, ) @@ -1939,6 +1935,7 @@ async def get_current_state_at( async def get_room_sync_data( self, sync_config: SlidingSyncConfig, + per_connection_state: "PerConnectionState", room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1986,11 +1983,7 @@ async def get_room_sync_data( from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = await self.connection_store.have_sent_room( - sync_config=sync_config, - connection_token=from_token.connection_position, - room_id=room_id, - ) + room_status = per_connection_state.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -3034,6 +3027,21 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) +@attr.s(auto_attribs=True, slots=True, frozen=True) +class PerConnectionState: + """The per-connection state + + Attributes: + rooms: The state of rooms that have been sent to clients. + """ + + rooms: Mapping[str, HaveSentRoom] = {} + + def have_sent_room(self, room_id: str) -> HaveSentRoom: + """Return whether we have previously sent the room down""" + return self.rooms.get(room_id, HAVE_SENT_ROOM_NEVER) + + @attr.s(auto_attribs=True) class SlidingSyncConnectionStore: """In-memory store of per-connection state, including what rooms we have @@ -3063,9 +3071,9 @@ class SlidingSyncConnectionStore: to mapping of room ID to `HaveSentRoom`. """ - # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` - _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( - attr.Factory(dict) + # `(user_id, conn_id)` -> `token` -> `PerConnectionState` + _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( + dict ) async def is_valid_token( @@ -3078,26 +3086,40 @@ async def is_valid_token( conn_key = self._get_connection_key(sync_config) return connection_token in self._connections.get(conn_key, {}) - async def have_sent_room( - self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str - ) -> HaveSentRoom: - """For the given user_id/conn_id/token, return whether we have - previously sent the room down + async def get_per_connection_state( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + ) -> PerConnectionState: + """Fetch the per-connection state for the token. + + Raises: + SlidingSyncUnknownPosition if the connection_token is unknown """ + if from_token is None: + return PerConnectionState() + + connection_position = from_token.connection_position + if connection_position == 0: + # The '0' values is a special value to indicate there is no + # per-connection state. + return PerConnectionState() conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.setdefault(conn_key, {}) - room_status = sync_statuses.get(connection_token, {}).get( - room_id, HAVE_SENT_ROOM_NEVER - ) + sync_statuses = self._connections.get(conn_key, {}) + connection_state = sync_statuses.get(connection_position) + + if connection_state is None: + raise SlidingSyncUnknownPosition() - return room_status + return connection_state @trace async def record_rooms( self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], + per_connection_state: PerConnectionState, *, sent_room_ids: StrCollection, unsent_room_ids: StrCollection, @@ -3131,7 +3153,7 @@ async def record_rooms( sync_statuses.pop(new_store_token, None) # Copy over and update the room mappings. - new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) + new_room_statuses = dict(per_connection_state.rooms) # Whether we have updated the `new_room_statuses`, if we don't by the # end we can treat this as a noop. @@ -3165,7 +3187,7 @@ async def record_rooms( if not have_updated: return prev_connection_token - sync_statuses[new_store_token] = new_room_statuses + sync_statuses[new_store_token] = PerConnectionState(rooms=new_room_statuses) return new_store_token From baac6c550e457c5ce79ebb24fae5813b93eeacb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Aug 2024 18:44:14 +0100 Subject: [PATCH 02/13] Record with new class --- synapse/handlers/sliding_sync.py | 140 ++++++++++++++++++------------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 2a104f83fd..055418e842 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -19,6 +19,8 @@ # import enum import logging +import typing +from collections import ChainMap from enum import Enum from itertools import chain from typing import ( @@ -848,6 +850,8 @@ async def handle_room(room_id: str) -> None: ) if has_lists or has_room_subscriptions: + new_connection_state = per_connection_state.get_mutable() + # We now calculate if any rooms outside the range have had updates, # which we are not sending down. # @@ -877,12 +881,16 @@ async def handle_room(room_id: str) -> None: ) unsent_room_ids = list(missing_event_map_by_room) - connection_position = await self.connection_store.record_rooms( + new_connection_state.record_unsent_rooms( + unsent_room_ids, from_token.stream_token + ) + + new_connection_state.record_sent_rooms(relevant_rooms_to_send_map.keys()) + + connection_position = await self.connection_store.record_new_state( sync_config=sync_config, from_token=from_token, - per_connection_state=per_connection_state, - sent_room_ids=relevant_rooms_to_send_map.keys(), - unsent_room_ids=unsent_room_ids, + per_connection_state=new_connection_state, ) elif from_token: connection_position = from_token.connection_position @@ -3041,6 +3049,69 @@ def have_sent_room(self, room_id: str) -> HaveSentRoom: """Return whether we have previously sent the room down""" return self.rooms.get(room_id, HAVE_SENT_ROOM_NEVER) + def get_mutable(self) -> "MutablePerConnectionState": + """Get a mutable copy of this state.""" + return MutablePerConnectionState( + rooms=dict(self.rooms), + ) + + +class MutablePerConnectionState(PerConnectionState): + """A mutable version of `PerConnectionState`""" + + rooms: typing.ChainMap[str, HaveSentRoom] + + def __init__( + self, + rooms: Dict[str, HaveSentRoom], + ) -> None: + super().__init__( + rooms=ChainMap({}, rooms), + ) + + def updated_rooms(self) -> Mapping[str, HaveSentRoom]: + """Return the room entries that have been updated""" + return self.rooms.maps[0] + + def has_updates(self) -> bool: + """Are there any updates""" + return bool(self.updated_rooms()) + + def record_sent_rooms(self, room_ids: StrCollection) -> None: + """Record that we have sent these rooms in the response""" + for room_id in room_ids: + current_status = self.rooms.get(room_id) + if ( + current_status is not None + and current_status.status == HaveSentRoomFlag.LIVE + ): + continue + + self.rooms[room_id] = HAVE_SENT_ROOM_LIVE + + def record_unsent_rooms( + self, room_ids: StrCollection, from_token: StreamToken + ) -> None: + """Record that we have not sent these rooms in the response, but there + have been updates. + """ + # Whether we add/update the entries for unsent rooms depends on the + # existing entry: + # - LIVE: We have previously sent down everything up to + # `last_room_token, so we update the entry to be `PREVIOUSLY` with + # `last_room_token`. + # - PREVIOUSLY: We have previously sent down everything up to *a* + # given token, so we don't need to update the entry. + # - NEVER: We have never previously sent down the room, and we haven't + # sent anything down this time either so we leave it as NEVER. + + for room_id in room_ids: + current_status = self.rooms.get(room_id) + if current_status is None or current_status.status != HaveSentRoomFlag.LIVE: + continue + + self.rooms[room_id] = HaveSentRoom.previously(from_token.room_key) + @attr.s(auto_attribs=True) class SlidingSyncConnectionStore: @@ -3115,33 +3186,17 @@ async def get_per_connection_state( return connection_state @trace - async def record_rooms( + async def record_new_state( self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], - per_connection_state: PerConnectionState, - *, - sent_room_ids: StrCollection, - unsent_room_ids: StrCollection, + per_connection_state: MutablePerConnectionState, ) -> int: - """Record which rooms we have/haven't sent down in a new response - - Attributes: - sync_config - from_token: The since token from the request, if any - sent_room_ids: The set of room IDs that we have sent down as - part of this request (only needs to be ones we didn't - previously sent down). - unsent_room_ids: The set of room IDs that have had updates - since the `from_token`, but which were not included in - this request - """ prev_connection_token = 0 if from_token is not None: prev_connection_token = from_token.connection_position - # If there are no changes then this is a noop. - if not sent_room_ids and not unsent_room_ids: + if not per_connection_state.has_updates(): return prev_connection_token conn_key = self._get_connection_key(sync_config) @@ -3152,42 +3207,9 @@ async def record_rooms( new_store_token = prev_connection_token + 1 sync_statuses.pop(new_store_token, None) - # Copy over and update the room mappings. - new_room_statuses = dict(per_connection_state.rooms) - - # Whether we have updated the `new_room_statuses`, if we don't by the - # end we can treat this as a noop. - have_updated = False - for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE - have_updated = True - - # Whether we add/update the entries for unsent rooms depends on the - # existing entry: - # - LIVE: We have previously sent down everything up to - # `last_room_token, so we update the entry to be `PREVIOUSLY` with - # `last_room_token`. - # - PREVIOUSLY: We have previously sent down everything up to *a* - # given token, so we don't need to update the entry. - # - NEVER: We have never previously sent down the room, and we haven't - # sent anything down this time either so we leave it as NEVER. - - # Work out the new state for unsent rooms that were `LIVE`. - if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True - - if not have_updated: - return prev_connection_token - - sync_statuses[new_store_token] = PerConnectionState(rooms=new_room_statuses) + sync_statuses[new_store_token] = PerConnectionState( + rooms=dict(per_connection_state.rooms), + ) return new_store_token From 0561c86c5d29ec61deb0d200356a8ec0b695b06d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 09:25:23 +0100 Subject: [PATCH 03/13] Revamp --- synapse/handlers/sliding_sync.py | 98 +++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 055418e842..00d7cb09ff 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -32,11 +32,13 @@ List, Literal, Mapping, + MutableMapping, Optional, Sequence, Set, Tuple, Union, + cast, ) import attr @@ -781,7 +783,7 @@ async def current_sync_for_user( # we haven't sent the room down, or we have but there are missing # updates). for room_id in relevant_room_map: - status = per_connection_state.have_sent_room(room_id) + status = per_connection_state.rooms.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know # about it regardless of any updates. @@ -881,11 +883,13 @@ async def handle_room(room_id: str) -> None: ) unsent_room_ids = list(missing_event_map_by_room) - new_connection_state.record_unsent_rooms( + new_connection_state.rooms.record_unsent_rooms( unsent_room_ids, from_token.stream_token ) - new_connection_state.record_sent_rooms(relevant_rooms_to_send_map.keys()) + new_connection_state.rooms.record_sent_rooms( + relevant_rooms_to_send_map.keys() + ) connection_position = await self.connection_store.record_new_state( sync_config=sync_config, @@ -1991,7 +1995,7 @@ async def get_room_sync_data( from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = per_connection_state.have_sent_room(room_id) + room_status = per_connection_state.rooms.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -3036,58 +3040,61 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": @attr.s(auto_attribs=True, slots=True, frozen=True) -class PerConnectionState: - """The per-connection state - - Attributes: - rooms: The state of rooms that have been sent to clients. - """ +class RoomStatusesForStream: + """For a given stream, e.g. events, records what we have or have not sent + down for that stream in a given room.""" - rooms: Mapping[str, HaveSentRoom] = {} + _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) def have_sent_room(self, room_id: str) -> HaveSentRoom: """Return whether we have previously sent the room down""" - return self.rooms.get(room_id, HAVE_SENT_ROOM_NEVER) + return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) - def get_mutable(self) -> "MutablePerConnectionState": + def get_mutable(self) -> "MutableRoomStatusesForStream": """Get a mutable copy of this state.""" - return MutablePerConnectionState( - rooms=dict(self.rooms), + return MutableRoomStatusesForStream( + statuses=self._statuses, ) + def copy(self) -> "RoomStatusesForStream": + """Make a copy of the class. Useful for converting from a mutable to + immutable version.""" -class MutablePerConnectionState(PerConnectionState): - """A mutable version of `PerConnectionState`""" + return RoomStatusesForStream(statuses=dict(self._statuses)) - rooms: typing.ChainMap[str, HaveSentRoom] + +class MutableRoomStatusesForStream(RoomStatusesForStream): + """A mutable version of `RoomStatusesForStream`""" + + _statuses: typing.ChainMap[str, HaveSentRoom] def __init__( self, - rooms: Dict[str, HaveSentRoom], + statuses: Mapping[str, HaveSentRoom], ) -> None: + # ChainMap requires a mutable mapping, but we're not actually going to + # mutate it. + statuses = cast(MutableMapping, statuses) + super().__init__( - rooms=ChainMap({}, rooms), + statuses=ChainMap({}, statuses), ) - def updated_rooms(self) -> Mapping[str, HaveSentRoom]: - """Return the room entries that have been updated""" - return self.rooms.maps[0] - - def has_updates(self) -> bool: - """Are there any updates""" - return bool(self.updated_rooms()) + def get_updates(self) -> Mapping[str, HaveSentRoom]: + """Return only the changes that were made""" + return self._statuses.maps[0] def record_sent_rooms(self, room_ids: StrCollection) -> None: """Record that we have sent these rooms in the response""" for room_id in room_ids: - current_status = self.rooms.get(room_id) + current_status = self._statuses.get(room_id) if ( current_status is not None and current_status.status == HaveSentRoomFlag.LIVE ): continue - self.rooms[room_id] = HAVE_SENT_ROOM_LIVE + self._statuses[room_id] = HAVE_SENT_ROOM_LIVE def record_unsent_rooms( self, room_ids: StrCollection, from_token: StreamToken @@ -3106,11 +3113,38 @@ def record_unsent_rooms( # sent anything down this time either so we leave it as NEVER. for room_id in room_ids: - current_status = self.rooms.get(room_id) + current_status = self._statuses.get(room_id) if current_status is None or current_status.status != HaveSentRoomFlag.LIVE: continue - self.rooms[room_id] = HaveSentRoom.previously(from_token.room_key) + self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) + + +@attr.s(auto_attribs=True) +class PerConnectionState: + """The per-connection state + + Attributes: + rooms: The status of each room for the events stream. + """ + + rooms: RoomStatusesForStream = attr.Factory(RoomStatusesForStream) + + def get_mutable(self) -> "MutablePerConnectionState": + """Get a mutable copy of this state.""" + return MutablePerConnectionState( + rooms=self.rooms.get_mutable(), + ) + + +@attr.s(auto_attribs=True) +class MutablePerConnectionState(PerConnectionState): + """A mutable version of `PerConnectionState`""" + + rooms: MutableRoomStatusesForStream + + def has_updates(self) -> bool: + return bool(self.rooms.get_updates()) @attr.s(auto_attribs=True) @@ -3208,7 +3242,7 @@ async def record_new_state( sync_statuses.pop(new_store_token, None) sync_statuses[new_store_token] = PerConnectionState( - rooms=dict(per_connection_state.rooms), + rooms=per_connection_state.rooms.copy(), ) return new_store_token From 2e7672d606d5f5101b0037dc12b67415a46b6690 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Aug 2024 11:13:36 +0100 Subject: [PATCH 04/13] Newsfile --- changelog.d/17574.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17574.misc diff --git a/changelog.d/17574.misc b/changelog.d/17574.misc new file mode 100644 index 0000000000..71020abec4 --- /dev/null +++ b/changelog.d/17574.misc @@ -0,0 +1 @@ +Refactor per-connection state in experimental sliding sync handler. From 64310eca35c1fc2ca40b05c9579e212e4f773853 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:07:39 +0100 Subject: [PATCH 05/13] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 00d7cb09ff..0eb2d3c5b9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3044,6 +3044,7 @@ class RoomStatusesForStream: """For a given stream, e.g. events, records what we have or have not sent down for that stream in a given room.""" + # `room_id` -> `HaveSentRoom` _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) def have_sent_room(self, room_id: str) -> HaveSentRoom: From 79e80eb258f3cebbb45e96a05622a8f0ceea0bc6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:07:47 +0100 Subject: [PATCH 06/13] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0eb2d3c5b9..53e4382d47 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3177,7 +3177,7 @@ class SlidingSyncConnectionStore: to mapping of room ID to `HaveSentRoom`. """ - # `(user_id, conn_id)` -> `token` -> `PerConnectionState` + # `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState` _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( dict ) From d982efe52c29b6552997c6083495b783366bb921 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:09:06 +0100 Subject: [PATCH 07/13] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 53e4382d47..f625c02fad 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3207,8 +3207,7 @@ async def get_per_connection_state( connection_position = from_token.connection_position if connection_position == 0: - # The '0' values is a special value to indicate there is no - # per-connection state. + # Initial sync (request without a `from_token`) starts at `0` so there is no existing per-connection state return PerConnectionState() conn_key = self._get_connection_key(sync_config) From b0a5c0efa10d0ea0cb9ddb5e27bf3a7da941d780 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:09:44 +0100 Subject: [PATCH 08/13] Apply suggestions from code review Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index f625c02fad..450d645601 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -585,7 +585,7 @@ async def current_sync_for_user( # rooms to get them some results sooner but will end up taking the same # amount of time (more with round-trips and re-processing) in the end to # get everything again. - per_connection_state = await self.connection_store.get_per_connection_state( + previous_connection_state = await self.connection_store.get_per_connection_state( sync_config, from_token ) @@ -3123,7 +3123,13 @@ def record_unsent_rooms( @attr.s(auto_attribs=True) class PerConnectionState: - """The per-connection state + """The per-connection state. A snapshot of what we've sent down the connection before. + + Currently, we track whether we've sent down various aspects of a given room before. + + We use the `rooms` field to store the position in the events stream for each room that we've previously sent to the client before. On the next request that includes the room, we can then send only what's changed since that recorded position. + + Same goes for the `receipts` field so we only need to send the new receipts since the last time you made a sync request. Attributes: rooms: The status of each room for the events stream. From 577370ae4417e4cfdeca29a92eed9bd9af9ec9bd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:05:39 +0100 Subject: [PATCH 09/13] Simplify get room status --- synapse/handlers/sliding_sync.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 450d645601..32adeb405c 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3088,10 +3088,9 @@ def get_updates(self) -> Mapping[str, HaveSentRoom]: def record_sent_rooms(self, room_ids: StrCollection) -> None: """Record that we have sent these rooms in the response""" for room_id in room_ids: - current_status = self._statuses.get(room_id) + current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) if ( - current_status is not None - and current_status.status == HaveSentRoomFlag.LIVE + current_status.status == HaveSentRoomFlag.LIVE ): continue @@ -3114,8 +3113,8 @@ def record_unsent_rooms( # sent anything down this time either so we leave it as NEVER. for room_id in room_ids: - current_status = self._statuses.get(room_id) - if current_status is None or current_status.status != HaveSentRoomFlag.LIVE: + current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + if current_status.status != HaveSentRoomFlag.LIVE: continue self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) From 27b7a4a04d9ee95cb6d2e77d6881e13ad1c6177e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:11:50 +0100 Subject: [PATCH 10/13] Add docstring --- synapse/handlers/sliding_sync.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 32adeb405c..56e9ed51cc 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -585,8 +585,10 @@ async def current_sync_for_user( # rooms to get them some results sooner but will end up taking the same # amount of time (more with round-trips and re-processing) in the end to # get everything again. - previous_connection_state = await self.connection_store.get_per_connection_state( - sync_config, from_token + previous_connection_state = ( + await self.connection_store.get_per_connection_state( + sync_config, from_token + ) ) await self.connection_store.mark_token_seen( @@ -3089,9 +3091,7 @@ def record_sent_rooms(self, room_ids: StrCollection) -> None: """Record that we have sent these rooms in the response""" for room_id in room_ids: current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) - if ( - current_status.status == HaveSentRoomFlag.LIVE - ): + if current_status.status == HaveSentRoomFlag.LIVE: continue self._statuses[room_id] = HAVE_SENT_ROOM_LIVE @@ -3123,11 +3123,11 @@ def record_unsent_rooms( @attr.s(auto_attribs=True) class PerConnectionState: """The per-connection state. A snapshot of what we've sent down the connection before. - + Currently, we track whether we've sent down various aspects of a given room before. - + We use the `rooms` field to store the position in the events stream for each room that we've previously sent to the client before. On the next request that includes the room, we can then send only what's changed since that recorded position. - + Same goes for the `receipts` field so we only need to send the new receipts since the last time you made a sync request. Attributes: @@ -3231,6 +3231,12 @@ async def record_new_state( from_token: Optional[SlidingSyncStreamToken], per_connection_state: MutablePerConnectionState, ) -> int: + """Record updated per-connection state, returning the connection + position associated with the new state. + + If there are no changes to the state this may return the same token as + the existing per-connection state. + """ prev_connection_token = 0 if from_token is not None: prev_connection_token = from_token.connection_position From dec5314e734e619cba76f5aaab22d9ac525b4cd9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:19:21 +0100 Subject: [PATCH 11/13] Fixups --- synapse/handlers/sliding_sync.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 56e9ed51cc..5aa008b104 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -785,7 +785,7 @@ async def current_sync_for_user( # we haven't sent the room down, or we have but there are missing # updates). for room_id in relevant_room_map: - status = per_connection_state.rooms.have_sent_room(room_id) + status = previous_connection_state.rooms.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know # about it regardless of any updates. @@ -821,7 +821,7 @@ async def current_sync_for_user( async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, - per_connection_state=per_connection_state, + per_connection_state=previous_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -854,7 +854,7 @@ async def handle_room(room_id: str) -> None: ) if has_lists or has_room_subscriptions: - new_connection_state = per_connection_state.get_mutable() + new_connection_state = previous_connection_state.get_mutable() # We now calculate if any rooms outside the range have had updates, # which we are not sending down. @@ -3212,7 +3212,8 @@ async def get_per_connection_state( connection_position = from_token.connection_position if connection_position == 0: - # Initial sync (request without a `from_token`) starts at `0` so there is no existing per-connection state + # Initial sync (request without a `from_token`) starts at `0` so + # there is no existing per-connection state return PerConnectionState() conn_key = self._get_connection_key(sync_config) From 5b6755a18d9e05dee923ae7bd573eb304da2f574 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Aug 2024 10:19:25 +0100 Subject: [PATCH 12/13] Rename --- synapse/handlers/sliding_sync.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 5aa008b104..4c14b850ba 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3042,7 +3042,7 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": @attr.s(auto_attribs=True, slots=True, frozen=True) -class RoomStatusesForStream: +class RoomStatusMap: """For a given stream, e.g. events, records what we have or have not sent down for that stream in a given room.""" @@ -3053,21 +3053,21 @@ def have_sent_room(self, room_id: str) -> HaveSentRoom: """Return whether we have previously sent the room down""" return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) - def get_mutable(self) -> "MutableRoomStatusesForStream": + def get_mutable(self) -> "MutableRoomStatusMap": """Get a mutable copy of this state.""" - return MutableRoomStatusesForStream( + return MutableRoomStatusMap( statuses=self._statuses, ) - def copy(self) -> "RoomStatusesForStream": + def copy(self) -> "RoomStatusMap": """Make a copy of the class. Useful for converting from a mutable to immutable version.""" - return RoomStatusesForStream(statuses=dict(self._statuses)) + return RoomStatusMap(statuses=dict(self._statuses)) -class MutableRoomStatusesForStream(RoomStatusesForStream): - """A mutable version of `RoomStatusesForStream`""" +class MutableRoomStatusMap(RoomStatusMap): + """A mutable version of `RoomStatusMap`""" _statuses: typing.ChainMap[str, HaveSentRoom] @@ -3134,7 +3134,7 @@ class PerConnectionState: rooms: The status of each room for the events stream. """ - rooms: RoomStatusesForStream = attr.Factory(RoomStatusesForStream) + rooms: RoomStatusMap = attr.Factory(RoomStatusMap) def get_mutable(self) -> "MutablePerConnectionState": """Get a mutable copy of this state.""" @@ -3147,7 +3147,7 @@ def get_mutable(self) -> "MutablePerConnectionState": class MutablePerConnectionState(PerConnectionState): """A mutable version of `PerConnectionState`""" - rooms: MutableRoomStatusesForStream + rooms: MutableRoomStatusMap def has_updates(self) -> bool: return bool(self.rooms.get_updates()) From 7f5bccc646823c9530f31f8dd18029a015660d13 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 18:23:21 +0100 Subject: [PATCH 13/13] Add some comments that chain map won't infinitely grow --- synapse/handlers/sliding_sync.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 4c14b850ba..c615cc7c32 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3069,6 +3069,9 @@ def copy(self) -> "RoomStatusMap": class MutableRoomStatusMap(RoomStatusMap): """A mutable version of `RoomStatusMap`""" + # We use a ChainMap here so that we can easily track what has been updated + # and what hasn't. Note that when we persist the per connection state this + # will get flattened to a normal dict (via calling `.copy()`) _statuses: typing.ChainMap[str, HaveSentRoom] def __init__( @@ -3253,6 +3256,8 @@ async def record_new_state( new_store_token = prev_connection_token + 1 sync_statuses.pop(new_store_token, None) + # We copy the `MutablePerConnectionState` so that the inner `ChainMap`s + # don't grow forever. sync_statuses[new_store_token] = PerConnectionState( rooms=per_connection_state.rooms.copy(), )