Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Use stream_ordering based timeline pagination for incremental sync #17510

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17510.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ async def get_new_events(
from_key=from_key,
to_key=to_key,
limit=limit or 10,
order="ASC",
direction=Direction.FORWARDS,
)

events = list(room_events)
Expand Down
25 changes: 13 additions & 12 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1855,18 +1855,19 @@ async def get_room_sync_data(
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
event_filter=None,
timeline_events, new_room_key = (
await self.store.get_room_events_stream_for_room(
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative, we could adapt paginate_room_events(...) to conditionally use topological_ordering vs stream_ordering

room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a possible question of, do we even want to do this?

Using stream_ordering makes sense in terms of /sync returning new items that the server received. it's also what the spec says we should do. We also make sure that if a new event is received, it's returned to the client. Otherwise, it seems like there is an edge case if we use topological_ordering that a new event might not be returned because it's topological position is calculated to be older.

Using topological_ordering makes sense in terms of matching /messages so people don't have to worry about different orders of things between API's.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so the way that sync v2 works in synapse currently is:

  1. The first time a room is sent down /sync we use paginate_room_events to get a chunk of history for the client, this basically results in the same as if no history was sent down /messages was used.
  2. We use get_room_events_stream_for_rooms to get all updates for rooms since the last /sync, which is in stream ordering.

I think this is probably the right way of looking at it, we use topological ordering when we paginate backwards, but stream ordering to fetch new updates to the room (even if those updates happened a while ago in the past).

What we probably want to do for sliding sync is to more or less replicate that behaviour: when we have an incremental fetch all updates (i.e. get_room_events_stream_for_rooms for joined rooms and fetch all membership changes), and then only use paginate_room_events for the new rooms.

(This also has the advantage we'd only need to sort and filter rooms we know have updates)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds pretty reasonable to me 👍

I've created a spec issue to track this: matrix-org/matrix-spec#1917

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

# We want to return the events in ascending order (the last event is the
Expand Down
20 changes: 15 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from synapse.api.constants import (
AccountDataTypes,
Direction,
EventContentFields,
EventTypes,
JoinRules,
Expand Down Expand Up @@ -883,15 +884,20 @@ async def _load_filtered_recents(
# that have happened since `since_key` up to `end_key`, so we
# can just use `get_room_events_stream_for_room`.
# Otherwise, we want to return the last N events in the room
# in topological ordering.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# in `stream_ordering`.
if since_key:
events, end_key = await self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
from_key=end_key,
to_key=since_key,
direction=Direction.BACKWARDS,
)
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()
else:
# TODO: This should return events in `stream_ordering` order
events, end_key = await self.store.get_recent_events_for_room(
room_id, limit=load_limit + 1, end_token=end_key
)
Expand Down Expand Up @@ -2641,9 +2647,10 @@ async def _get_room_changes_for_incremental_sync(
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
from_key=now_token.room_key,
to_key=since_token.room_key,
limit=timeline_limit + 1,
direction=Direction.BACKWARDS,
)

# We loop through all room ids, even if there are no new events, in case
Expand All @@ -2654,6 +2661,9 @@ async def _get_room_changes_for_incremental_sync(
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

prev_batch_token = now_token.copy_and_replace(
StreamKeyType.ROOM, start_key
Expand Down
Loading
Loading