Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSS: Implement PREVIOUSLY room tracking #17535

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17535.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately.
68 changes: 52 additions & 16 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ async def current_sync_for_user(
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we can display and need to fetch more info about
relevant_room_map: Dict[str, RoomSyncConfig] = {}
# The set of room IDs of all rooms that could appear in any list. These
# include rooms that are outside the list ranges.
all_rooms: Set[str] = set()
if has_lists and sync_config.lists is not None:
with start_active_span("assemble_sliding_window_lists"):
sync_room_map = await self.filter_rooms_relevant_for_sync(
Expand All @@ -558,11 +561,6 @@ async def current_sync_for_user(
to_token,
)

# Sort the list
sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token
)

# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = (
Expand All @@ -583,6 +581,23 @@ async def current_sync_for_user(
and StateValues.LAZY in membership_state_keys
)

if not lazy_loading:
# Exclude partially-stated rooms unless the `required_state`
# only has `["m.room.member", "$LAZY"]` for membership
# (lazy-loading room members).
filtered_sync_room_map = {
room_id: room
for room_id, room in filtered_sync_room_map.items()
if not partial_state_room_map.get(room_id)
}

all_rooms.update(filtered_sync_room_map)

# Sort the list
sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token
)

ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
Expand All @@ -600,15 +615,6 @@ async def current_sync_for_user(
if len(room_ids_in_list) >= max_num_rooms:
break

# Exclude partially-stated rooms unless the `required_state`
# only has `["m.room.member", "$LAZY"]` for membership
# (lazy-loading room members).
if (
partial_state_room_map.get(room_id)
and not lazy_loading
):
continue

# Take the superset of the `RoomSyncConfig` for each room.
#
# Update our `relevant_room_map` with the room we're going
Expand Down Expand Up @@ -661,6 +667,8 @@ async def current_sync_for_user(
if not room_membership_for_user_at_to_token:
continue

all_rooms.add(room_id)

room_membership_for_user_map[room_id] = (
room_membership_for_user_at_to_token
)
Expand Down Expand Up @@ -768,12 +776,40 @@ async def handle_room(room_id: str) -> None:
)

if has_lists or has_room_subscriptions:
# We now calculate if any rooms outside the range have had updates,
# which we are not sending down.
#
# We *must* record rooms that have had updates, but it is also fine
# to record rooms as having updates even if there might not actually
# be anything new for the user (e.g. due to event filters, events
# having happened after the user left, etc).
unsent_room_ids = []
if from_token:
# The set of rooms that the client (may) care about, but aren't
# in any list range (or subscribed to).
missing_rooms = all_rooms - relevant_room_map.keys()

# We now just go and try fetching any events in the above rooms
# to see if anything has happened since the `from_token`.
#
# TODO: Replace this with something faster. When we land the
# sliding sync tables that record the most recent event
# positions we can use that.
Comment on lines +798 to +800
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can already re-use the data from sort_rooms(...) and return a _SortedRoomMembershipForUser(...) that includes the stream_ordering of the latest event in the room which we can compare to our from_token. See b59ccf7 for how I previously did this for sharing the bump_stamp info.

Then when the sliding sync tables land, we can use them in sort_rooms() to make that faster.

We can also do this in a follow-up since the PR is already nice and compact to introduce the concept 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I was thinking that we should reuse some of the work from the work you were doing with the DB tables, but forgot we could already reuse what we had from sort_rooms. I might leave this as is for now until we land the new DB tables? That way we don't collide, and we can land this PR ASAP

missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
missing_rooms,
from_key=from_token.stream_token.room_key,
to_key=to_token.room_key,
limit=1,
)
)
unsent_room_ids = list(missing_event_map_by_room)

connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
from_token=from_token,
sent_room_ids=relevant_rooms_to_send_map.keys(),
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
unsent_room_ids=[],
unsent_room_ids=unsent_room_ids,
)
elif from_token:
connection_position = from_token.connection_position
Expand Down
72 changes: 0 additions & 72 deletions tests/rest/client/sliding_sync/test_connection_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import SlidingSyncStreamToken
from synapse.types.handlers import SlidingSyncConfig
from synapse.util import Clock

from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
Expand Down Expand Up @@ -130,7 +128,6 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None
self.helper.send(room_id1, "msg", tok=user1_tok)

timeline_limit = 5
conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
Expand Down Expand Up @@ -170,40 +167,6 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)

# FIXME: This is a hack to record that the first room wasn't sent down
# sync, as we don't implement that currently.
sliding_sync_handler = self.hs.get_sliding_sync_handler()
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
user=requester.user,
requester=requester,
conn_id=conn_id,
)

parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
)

# FIXME: Now fix up `from_token` with new connect position above.
parsed_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
)
parsed_from_token = SlidingSyncStreamToken(
stream_token=parsed_from_token.stream_token,
connection_position=connection_position,
)
from_token = self.get_success(parsed_from_token.to_string(self.store))

# We now send another event to room1, so we should sync all the missing events.
resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
expected_events.append(resp["event_id"])
Expand Down Expand Up @@ -238,7 +201,6 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:

self.helper.send(room_id1, "msg", tok=user1_tok)

conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
Expand Down Expand Up @@ -279,40 +241,6 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)

# FIXME: This is a hack to record that the first room wasn't sent down
# sync, as we don't implement that currently.
sliding_sync_handler = self.hs.get_sliding_sync_handler()
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
user=requester.user,
requester=requester,
conn_id=conn_id,
)

parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
)

# FIXME: Now fix up `from_token` with new connect position above.
parsed_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
)
parsed_from_token = SlidingSyncStreamToken(
stream_token=parsed_from_token.stream_token,
connection_position=connection_position,
)
from_token = self.get_success(parsed_from_token.to_string(self.store))

# We now send another event to room1, so we should sync all the missing state.
self.helper.send(room_id1, "msg", tok=user1_tok)

Expand Down
Loading