From 11f34920e73f3cd2365cc6ca3c4fffe9de37b9a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 12:46:21 +0100 Subject: [PATCH 1/7] Refactor to make LIVE non-static value --- synapse/handlers/sliding_sync.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 73414dbf694..6ddc8ff87b9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2237,6 +2237,10 @@ class HaveSentRoom: status: HaveSentRoomFlag last_token: Optional[RoomStreamToken] + @staticmethod + def live() -> "HaveSentRoom": + return HaveSentRoom(HaveSentRoomFlag.LIVE, None) + @staticmethod def previously(last_token: RoomStreamToken) -> "HaveSentRoom": """Constructor for `PREVIOUSLY` flag.""" @@ -2244,7 +2248,6 @@ def previously(last_token: RoomStreamToken) -> "HaveSentRoom": HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) -HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) @attr.s(auto_attribs=True) @@ -2339,7 +2342,7 @@ async def record_rooms( # end we can treat this as a noop. have_updated = False for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE + new_room_statuses[room_id] = HaveSentRoom.live() have_updated = True # Whether we add/update the entries for unsent rooms depends on the From 9284cc0110bb8bdcf0b9a6852cd511a345455009 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 12:49:25 +0100 Subject: [PATCH 2/7] Add fast path if from_token is None --- synapse/handlers/sliding_sync.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 6ddc8ff87b9..b1bede894dc 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2354,18 +2354,21 @@ async def record_rooms( # given token, so we don't need to update the entry. # - NEVER: We have never previously sent down the room, and we haven't # sent anything down this time either so we leave it as NEVER. + # + # We only need to do this if `from_token` is not None, as if it is then + # we know that there are no existing entires. - # Work out the new state for unsent rooms that were `LIVE`. if from_token: new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True + for room_id in unsent_room_ids: + prev_state = new_room_statuses.get(room_id) + if ( + prev_state is not None + and prev_state.status == HaveSentRoomFlag.LIVE + ): + new_room_statuses[room_id] = new_unsent_state + have_updated = True if not have_updated: return prev_connection_token From ce09ef058bc7bf0358bbc5e090035f2fc5995669 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 12:50:37 +0100 Subject: [PATCH 3/7] Only mark as updated if entry has changed --- synapse/handlers/sliding_sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b1bede894dc..e4c438f6c8f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2342,8 +2342,10 @@ async def record_rooms( # end we can treat this as a noop. have_updated = False for room_id in sent_room_ids: + prev_state = new_room_statuses.get(room_id) new_room_statuses[room_id] = HaveSentRoom.live() - have_updated = True + if prev_state != new_room_statuses[room_id]: + have_updated = True # Whether we add/update the entries for unsent rooms depends on the # existing entry: From 394c25a7fecfb56eb67dfba4cab26a7e405724ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 13:34:44 +0100 Subject: [PATCH 4/7] Remember previous timeline limit --- synapse/handlers/sliding_sync.py | 27 ++++++++++++------ tests/rest/client/test_sync.py | 47 ++++++++++++++++++++++++-------- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index e4c438f6c8f..d94160c59b2 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -697,6 +697,7 @@ async def handle_room(room_id: str) -> None: if has_lists or has_room_subscriptions: connection_position = await self.connection_store.record_rooms( sync_config=sync_config, + room_configs=relevant_room_map, from_token=from_token, sent_room_ids=relevant_room_map.keys(), # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids` @@ -2232,22 +2233,26 @@ class HaveSentRoom: contains the last stream token of the last updates we sent down the room, i.e. we still need to send everything since then to the client. + timeline_limit: The timeline limit config for the room, if LIVE or + PREVIOUSLY. This is used to track if the client has increased + the timeline limit to request more events. """ status: HaveSentRoomFlag last_token: Optional[RoomStreamToken] + timeline_limit: Optional[int] @staticmethod - def live() -> "HaveSentRoom": - return HaveSentRoom(HaveSentRoomFlag.LIVE, None) + def live(timeline_limit: int) -> "HaveSentRoom": + return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit) @staticmethod - def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom": """Constructor for `PREVIOUSLY` flag.""" - return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) + return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit) -HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) +HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None) @attr.s(auto_attribs=True) @@ -2302,6 +2307,7 @@ async def have_sent_room( async def record_rooms( self, sync_config: SlidingSyncConfig, + room_configs: Dict[str, RoomSyncConfig], from_token: Optional[SlidingSyncStreamToken], *, sent_room_ids: StrCollection, @@ -2343,7 +2349,9 @@ async def record_rooms( have_updated = False for room_id in sent_room_ids: prev_state = new_room_statuses.get(room_id) - new_room_statuses[room_id] = HaveSentRoom.live() + new_room_statuses[room_id] = HaveSentRoom.live( + room_configs[room_id].timeline_limit + ) if prev_state != new_room_statuses[room_id]: have_updated = True @@ -2361,15 +2369,16 @@ async def record_rooms( # we know that there are no existing entires. if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - for room_id in unsent_room_ids: prev_state = new_room_statuses.get(room_id) if ( prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE ): - new_room_statuses[room_id] = new_unsent_state + new_room_statuses[room_id] = HaveSentRoom.previously( + from_token.stream_token.room_key, + room_configs[room_id].timeline_limit, + ) have_updated = True if not have_updated: diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 5abf1041bef..8d5fd30fdd5 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -39,7 +39,8 @@ ) from synapse.api.room_versions import RoomVersions from synapse.events import EventBase -from synapse.handlers.sliding_sync import StateValues +from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues +from synapse.http.servlet import validate_json_object from synapse.rest.client import ( devices, knock, @@ -53,6 +54,7 @@ from synapse.server import HomeServer from synapse.types import ( JsonDict, + Requester, RoomStreamToken, SlidingSyncStreamToken, StreamKeyType, @@ -60,6 +62,7 @@ UserID, ) from synapse.types.handlers import SlidingSyncConfig +from synapse.types.rest.client import SlidingSyncBody from synapse.util import Clock from synapse.util.stringutils import random_string @@ -1357,6 +1360,22 @@ async def _on_new_acivity( "Expected `notifier.wait_for_events(...)` to be triggered" ) + def make_sync_config( + self, user: UserID, requester: Requester, content: JsonDict + ) -> SlidingSyncConfig: + """Helper function to turn a dict sync body to a sync config""" + body = validate_json_object(content, SlidingSyncBody) + + sync_config = SlidingSyncConfig( + user=user, + requester=requester, + conn_id=body.conn_id, + lists=body.lists, + room_subscriptions=body.room_subscriptions, + extensions=body.extensions, + ) + return sync_config + class SlidingSyncTestCase(SlidingSyncBase): """ @@ -4538,7 +4557,6 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None self.helper.send(room_id1, "msg", tok=user1_tok) timeline_limit = 5 - conn_id = "conn_id" sync_body = { "lists": { "foo-list": { @@ -4584,19 +4602,22 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None requester = self.get_success( self.hs.get_auth().get_user_by_access_token(user1_tok) ) - sync_config = SlidingSyncConfig( - user=requester.user, - requester=requester, - conn_id=conn_id, + sync_config = self.make_sync_config( + user=requester.user, requester=requester, content=sync_body ) parsed_initial_from_token = self.get_success( SlidingSyncStreamToken.from_string(self.store, initial_from_token) ) + assert sync_config.lists + room_configs = { + room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"]) + } connection_position = self.get_success( sliding_sync_handler.connection_store.record_rooms( sync_config, - parsed_initial_from_token, + room_configs=room_configs, + from_token=parsed_initial_from_token, sent_room_ids=[], unsent_room_ids=[room_id1], ) @@ -4646,7 +4667,6 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: self.helper.send(room_id1, "msg", tok=user1_tok) - conn_id = "conn_id" sync_body = { "lists": { "foo-list": { @@ -4693,19 +4713,24 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: requester = self.get_success( self.hs.get_auth().get_user_by_access_token(user1_tok) ) - sync_config = SlidingSyncConfig( + sync_config = self.make_sync_config( user=requester.user, requester=requester, - conn_id=conn_id, + content=sync_body, ) parsed_initial_from_token = self.get_success( SlidingSyncStreamToken.from_string(self.store, initial_from_token) ) + assert sync_config.lists + room_configs = { + room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"]) + } connection_position = self.get_success( sliding_sync_handler.connection_store.record_rooms( sync_config, - parsed_initial_from_token, + room_configs=room_configs, + from_token=parsed_initial_from_token, sent_room_ids=[], unsent_room_ids=[room_id1], ) From f17ff7cf0b39cb301b9e86902717e818a07e23ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 13:59:02 +0100 Subject: [PATCH 5/7] Handle increases in timeline limit --- synapse/handlers/sliding_sync.py | 32 ++++++++++++++- tests/rest/client/test_sync.py | 70 ++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index d94160c59b2..a88c66c4f74 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -653,6 +653,13 @@ async def current_sync_for_user( else: assert_never(status.status) + if status.timeline_limit is not None and ( + status.timeline_limit < relevant_room_map[room_id].timeline_limit + ): + # If the timeline limit has increased we want to send down + # more historic events (even if nothing has since changed). + rooms_should_send.add(room_id) + # We only need to check for new events since any state changes # will also come down as new events. rooms_that_have_updates = self.store.get_rooms_that_might_have_updates( @@ -1476,7 +1483,12 @@ async def get_room_sync_data( # - When users `newly_joined` # - For an incremental sync where we haven't sent it down this # connection before + # + # We also decide if we should ignore the timeline bound or not. This is + # to handle the case where the client has requested more historical + # messages in the room by increasing the timeline limit. from_bound = None + ignore_timeline_bound = False initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: room_status = await self.connection_store.have_sent_room( @@ -1484,6 +1496,7 @@ async def get_room_sync_data( connection_token=from_token.connection_position, room_id=room_id, ) + if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -1497,9 +1510,24 @@ async def get_room_sync_data( else: assert_never(room_status.status) + if room_status.timeline_limit is not None and ( + room_status.timeline_limit < room_sync_config.timeline_limit + ): + # If the timeline limit has been increased since previous + # requests then we treat it as request for more events. We do + # this by sending down a `limited` sync, ignoring the from + # bound. + ignore_timeline_bound = True + log_kv({"sliding_sync.room_status": room_status}) - log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) + log_kv( + { + "sliding_sync.from_bound": from_bound, + "sliding_sync.ignore_timeline_bound": ignore_timeline_bound, + "sliding_sync.initial": initial, + } + ) # Assemble the list of timeline events # @@ -1542,7 +1570,7 @@ async def get_room_sync_data( # (from newer to older events) starting at to_bound. # This ensures we fill the `limit` with the newest events first, from_key=to_bound, - to_key=from_bound, + to_key=from_bound if not ignore_timeline_bound else None, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 8d5fd30fdd5..bb723927c4d 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -4940,6 +4940,76 @@ def test_empty_initial_room_comes_down_sync(self) -> None: response_body, _ = self.do_sync(sync_body, tok=user1_tok) self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + def test_increasing_timeline_range_sends_more_messages(self) -> None: + """ + Test that increasing the timeline limit via room subscriptions sends the + room down with more messages in a limited sync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [[EventTypes.Create, ""]], + "timeline_limit": 1, + } + } + } + + message_events = [] + for _ in range(10): + resp = self.helper.send(room_id1, "msg", tok=user1_tok) + message_events.append(resp["event_id"]) + + # Make the first Sliding Sync request + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + room_response = response_body["rooms"][room_id1] + + self.assertEqual(room_response["initial"], True) + self.assertEqual(room_response["limited"], True) + + # We only expect the last message at first + self.assertEqual( + [event["event_id"] for event in room_response["timeline"]], + message_events[-1:], + room_response["timeline"], + ) + + # We also expect to get the create event state. + self.assertEqual( + [event["type"] for event in room_response["required_state"]], + [EventTypes.Create], + ) + + # Now do another request with a room subscription with an increased timeline limit + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 10, + } + } + + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + room_response = response_body["rooms"][room_id1] + + self.assertNotIn("initial", room_response) + self.assertEqual(room_response["limited"], True) + + # Now we expect all the messages + self.assertEqual( + [event["event_id"] for event in room_response["timeline"]], + message_events, + room_response["timeline"], + ) + + # We don't expect to get the room create down, as nothing has changed. + self.assertNotIn("required_state", room_response) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" From 9c2354b2b1842c4b66f83db744012f5ef9560320 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 18:40:02 +0100 Subject: [PATCH 6/7] Force limited=true --- synapse/handlers/sliding_sync.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index a88c66c4f74..8af820896a2 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1595,6 +1595,12 @@ async def get_room_sync_data( stream=timeline_events[0].internal_metadata.stream_ordering - 1 ) + if ignore_timeline_bound: + # If we're ignoring the timeline bound we *must* set limited to + # true, as otherwise the client will append the received events + # to the timeline, rather than replacing it. + limited = True + # Make sure we don't expose any events that the client shouldn't see timeline_events = await filter_events_for_client( self.storage_controllers, From d10361dc7ddbc8c8e137cd80ffb6ac12a120fad2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jul 2024 18:40:55 +0100 Subject: [PATCH 7/7] Newsfile --- changelog.d/17503.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17503.misc diff --git a/changelog.d/17503.misc b/changelog.d/17503.misc new file mode 100644 index 00000000000..20280538155 --- /dev/null +++ b/changelog.d/17503.misc @@ -0,0 +1 @@ +Handle requests for more events in a room when using experimental sliding sync.