Skip to content

Commit

Permalink
Only send rooms with updates down sliding sync (#17479)
Browse files Browse the repository at this point in the history
Rather than always including all rooms in range.

Also adds a pre-filter to rooms that checks the stream change cache to
see if anything might have happened.

Based on #17447

---------

Co-authored-by: Eric Eastwood <[email protected]>
  • Loading branch information
erikjohnston and MadLittleMods authored Jul 30, 2024
1 parent be4a16f commit 34306be
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/17479.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Do not send down empty room entries down experimental sliding sync endpoint.
51 changes: 49 additions & 2 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,51 @@ async def current_sync_for_user(
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}

# Filter out rooms that haven't received updates and we've sent down
# previously.
if from_token:
rooms_should_send = set()

# First we check if there are rooms that match a list/room
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
status = await self.connection_store.have_sent_room(
sync_config,
from_token.connection_position,
room_id,
)
if (
# The room was never sent down before so the client needs to know
# about it regardless of any updates.
status.status == HaveSentRoomFlag.NEVER
# `PREVIOUSLY` literally means the "room was sent down before *AND*
# there are updates we haven't sent down" so we already know this
# room has updates.
or status.status == HaveSentRoomFlag.PREVIOUSLY
):
rooms_should_send.add(room_id)
elif status.status == HaveSentRoomFlag.LIVE:
# We know that we've sent all updates up until `from_token`,
# so we just need to check if there have been updates since
# then.
pass
else:
assert_never(status.status)

# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
relevant_room_map.keys(), from_token.stream_token.room_key
)
rooms_should_send.update(rooms_that_have_updates)
relevant_room_map = {
room_id: room_sync_config
for room_id, room_sync_config in relevant_room_map.items()
if room_id in rooms_should_send
}

@trace
@tag_args
async def handle_room(room_id: str) -> None:
Expand All @@ -633,7 +678,9 @@ async def handle_room(room_id: str) -> None:
to_token=to_token,
)

rooms[room_id] = room_sync_result
# Filter out empty room results during incremental sync
if room_sync_result or not from_token:
rooms[room_id] = room_sync_result

with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_room_map, 10)
Expand Down Expand Up @@ -2198,7 +2245,7 @@ class SlidingSyncConnectionStore:
a connection position of 5 might have totally different states on worker A and
worker B.
One complication that we need to deal with here is needing to handle requests being
One complication that we need to deal with here is needing to handle requests being
resent, i.e. if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.
Expand Down
10 changes: 10 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2104,3 +2104,13 @@ async def get_timeline_gaps(
return RoomStreamToken(stream=last_position.stream - 1)

return None

def get_rooms_that_might_have_updates(
self, room_ids: StrCollection, from_token: RoomStreamToken
) -> StrCollection:
"""Filters given room IDs down to those that might have updates, i.e.
removes rooms that definitely do not have updates.
"""
return self._events_stream_cache.get_entities_changed(
room_ids, from_token.stream
)
17 changes: 16 additions & 1 deletion synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,17 @@ class StrippedHero:
notification_count: int
highlight_count: int

def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)

@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
Expand Down Expand Up @@ -367,7 +378,11 @@ def __bool__(self) -> bool:
to tell if the notifier needs to wait for more events when polling for
events.
"""
return bool(self.lists or self.rooms or self.extensions)
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)

@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
Expand Down
89 changes: 62 additions & 27 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
)
from tests.server import TimedOutException
from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
from tests.unittest import skip_unless

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1656,12 +1655,6 @@ def test_wait_for_new_data(self) -> None:
channel.json_body["rooms"][room_id]["timeline"],
)

# TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
# check if there are any updates since the `from_token`.
@skip_unless(
False,
"Once we remove ops from the Sliding Sync response, this test should pass",
)
def test_wait_for_new_data_timeout(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive but
Expand Down Expand Up @@ -1711,12 +1704,8 @@ def test_wait_for_new_data_timeout(self) -> None:
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)

# We still see rooms because that's how Sliding Sync lists work but we reached
# the timeout before seeing them
self.assertEqual(
[event["event_id"] for event in channel.json_body["rooms"].keys()],
[room_id],
)
# There should be no room sent down.
self.assertFalse(channel.json_body["rooms"])

def test_filter_list(self) -> None:
"""
Expand Down Expand Up @@ -3556,19 +3545,7 @@ def test_rooms_ban_incremental_sync2(self) -> None:
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"][room_id1].get("timeline"))
# No events returned in the timeline so nothing is "live"
self.assertEqual(
response_body["rooms"][room_id1]["num_live"],
0,
response_body["rooms"][room_id1],
)
# There aren't anymore events to paginate to in this range
self.assertEqual(
response_body["rooms"][room_id1]["limited"],
False,
response_body["rooms"][room_id1],
)
self.assertIsNone(response_body["rooms"].get(room_id1))

def test_rooms_no_required_state(self) -> None:
"""
Expand Down Expand Up @@ -3668,12 +3645,15 @@ def test_rooms_required_state_incremental_sync(self) -> None:
# This one doesn't exist in the room
[EventTypes.Tombstone, ""],
],
"timeline_limit": 0,
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Send a message so the room comes down sync.
self.helper.send(room_id1, "msg", tok=user1_tok)

# Make the incremental Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

Expand Down Expand Up @@ -4880,6 +4860,61 @@ def test_rooms_timeline_incremental_sync_NEVER(self) -> None:
self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)

def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
"""
Test that rooms with no updates are returned in subsequent incremental
syncs.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
}
}
}

_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Make the incremental Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# Nothing has happened in the room, so the room should not come down
# /sync.
self.assertIsNone(response_body["rooms"].get(room_id1))

def test_empty_initial_room_comes_down_sync(self) -> None:
"""
Test that rooms come down /sync even with empty required state and
timeline limit in initial sync.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
}
}
}

# Make the Sliding Sync request
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)


class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension"""
Expand Down

0 comments on commit 34306be

Please sign in to comment.