Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Handle timeline limit changes (take 2) #17579

Merged
merged 29 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
da5339d
Migrate to per-connection state class
erikjohnston Aug 12, 2024
baac6c5
Record with new class
erikjohnston Aug 14, 2024
0561c86
Revamp
erikjohnston Aug 15, 2024
c15b8b3
WIP receipts reading
erikjohnston Aug 13, 2024
a1b75f7
WIP comments
erikjohnston Aug 14, 2024
6b9d244
Record state
erikjohnston Aug 15, 2024
55feaae
Add tests
erikjohnston Aug 15, 2024
614c0d7
Newsfile
erikjohnston Aug 15, 2024
100927d
Comments
erikjohnston Aug 15, 2024
70d32fb
Add proper DB function for getting receipts between things
erikjohnston Aug 15, 2024
ee6efa2
Track room configs in per-connection state
erikjohnston Aug 16, 2024
009af0e
Handle timeline_limit changes
erikjohnston Aug 16, 2024
b23231e
Newsfile
erikjohnston Aug 16, 2024
aea946b
Merge remote-tracking branch 'origin/develop' into erikj/ss_room_sub2
erikjohnston Aug 19, 2024
33ec15b
Restore comments
erikjohnston Aug 19, 2024
768d150
Add docstring
erikjohnston Aug 19, 2024
a63261d
Restore comments
erikjohnston Aug 19, 2024
891ce47
Rename previous_room_configs
erikjohnston Aug 19, 2024
a4ad443
Use test helpers
erikjohnston Aug 19, 2024
0e8feed
Remove spurious set_tag
erikjohnston Aug 19, 2024
49c4645
Remove double insertion
erikjohnston Aug 19, 2024
299ab1b
Use timelime_limit not len(timeline)
erikjohnston Aug 19, 2024
ba4e63b
Add comment explaining the odd behaviour
erikjohnston Aug 19, 2024
2bba63e
Replace initial=true with unstable_expanded_timeline=true
erikjohnston Aug 19, 2024
52f4253
Improve comment
erikjohnston Aug 19, 2024
09538c2
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
733555b
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
76f882a
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
bcaf4e6
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17579.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.
Copy link
Contributor

@MadLittleMods MadLittleMods Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continues to feel horrible especially given new edge cases like this comment. Highly recommend we just update the client to use an initial sync request with timeline_limit: 20 and required_state: [] (which allows us to avoid the extra bytes) to accomplish the exact same thing without introducing any of this bizarre behavior.

Previous conversation for context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with @erikjohnston a bit more and was trying to figure out why the initial sync request doesn't solve this completely. I basically asked the opposite question for how/why timeline trickling/unstable_expanded_timeline makes this easier. The following question unlocked a better understanding for one complication when trying to use the initial sync route.

How is the timeline stitching done for the timeline trickling? How is the problem easier when using that?

[...] its all coming down one connection, so you know you've got a consistent "current" timeline chunk that you'll get updates for (which you can then optionally stitch together with whatever timeline chunks the client currently has)

This is a valid point!

I'm still leaning towards the side of initial sync being possible to use (and better) and just requires some basic timeline stitching logic. ElementX might already have some stitching and event de-duplication logic to handle what the proxy was doing before that would also cover this case.

Since ElementX doesn't have offline support and throws away events, I think we could just do this:

For the timeline stitching logic, the client can store the latest event in timeline before our initial sync, then find that event in the initial sync timeline events and spread backwards from that point. That way, the ongoing sync loop can still append to the end of the timeline and continue seamlessly.

So if we have a timeline [103] already on the client, we store latest_event_id = 103, do our initial sync which returns [100, 101, 102, 103, 104] and we splice/spread in only [100, 101, 102, 103] accordingly (drop any new events after the latest_event_id from the initial sync response). This makes it so that even if the ongoing sync loop sends 104 before or after our initial sync does, it still appends like normal and everything is in seamless order.

If there are so many new messages sent in the time between us storing the latest_event_id and the initial sync responding that we now have a gap, we can just throw away our initial sync events because we have enough events to fill up the timeline just from our normal ongoing sync loop.


To be clear, the client doesn't need to be fancy about stitching:

If the client had more timeline like [98, 99, 100, 101, 102, 103], we store latest_event_id = 103, we start the initial sync, our ongoing sync loop races us and returns 104 which makes our timeline look like [98, 99, 100, 101, 102, 103, 104]. Then our initial sync responds with [100, 101, 102, 103, 104], we find the 103 spot in the response to slice at and place it at the 103 spot in the client timeline leaving us with [100, 101, 102, 103, 104]

Pseudo code (maybe off-by-one errors):

latest_event_id = 103

# do initial sync request

initial_sync_timeline = [100, 101, 102, 103, 104]
event_index_in_response = initial_sync_timeline.index(latest_event_id)
# Skip if we can't find the `latest_event_id` in the response.
# This means there have been so many messages sent between the time we initially
# made the initial sync and the response that this is no longer relevant.
# We already have enough events to fill up the timeline from the normal
# ongoing sync loop
if event_index_in_response is None:
	return

event_index_in_client_timeline = client_timeline.index(latest_event_id)
# Update the timeline
client_timeline = initial_sync_timeline[0:event_index_in_response] + client_timeline[event_index_in_client_timeline:-1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need this same room sync config tracking for required_state (and probably filter/extension) changes so overall the concept isn't lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible edge case: ElementX triggers unstable_expanded_timeline by increasing the timeline_limit and gets a chunk of timeline and we record that higher timeline_limit. I go to bed and then I wake up (or just some period of time that new messages were sent in but I didn't have the app open). How does ElementX get timeline for all of the events in the gap? If it tries to trigger unstable_expanded_timeline again, it won't work because the last recorded timeline_limit is already just as high.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. There are two cases depending on if the timeline limit in the morning is small or high:

For the small case:

  • When the client comes online it'll get a small bit of history with limited=true.
  • The client can then either backpaginate in the room, or do a room sub with a larger timeline

For the high case:

  • When the client comes online it'll get a large chunk of history with limited=true. This is (hopefully) enough to show a screens worth of data, which is what we want.
  • If the client wants more it can just backpaginate.

What EX at least wants is to quickly be able to get enough chunks of history in rooms (in the background) to be able to show a screens worth of data. That way the UX is open the app, see a fast sync, (in the background it preloads the top N rooms with more timeline), the user clicks on one of the rooms and sees a page of timeline, and then the app can paginate in more timeline as usual (via /messages).

146 changes: 134 additions & 12 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,19 @@ async def current_sync_for_user(
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
for room_id, room_config in relevant_room_map.items():
prev_room_sync_config = (
previous_connection_state.previous_room_configs.get(room_id)
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
):
rooms_should_send.add(room_id)
continue

status = previous_connection_state.rooms.have_sent_room(room_id)
if (
# The room was never sent down before so the client needs to know
Expand Down Expand Up @@ -819,12 +831,17 @@ async def current_sync_for_user(
if room_id in rooms_should_send
}

new_connection_state = previous_connection_state.get_mutable()

@trace
@tag_args
async def handle_room(room_id: str) -> None:
set_tag("room_id", room_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

room_sync_result = await self.get_room_sync_data(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
room_id=room_id,
room_sync_config=relevant_rooms_to_send_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
Expand All @@ -842,8 +859,6 @@ async def handle_room(room_id: str) -> None:
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)

new_connection_state = previous_connection_state.get_mutable()

extensions = await self.get_extensions_response(
sync_config=sync_config,
actual_lists=lists,
Expand Down Expand Up @@ -1955,6 +1970,7 @@ async def get_room_sync_data(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
Expand Down Expand Up @@ -1999,8 +2015,15 @@ async def get_room_sync_data(
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
#
# We also need to check if the timeline limit has increased, if so we ignore
# the from bound for the timeline to send down a larger chunk of
# history.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
#
# TODO: Also handle changes to `required_state`
from_bound = None
initial = True
ignore_timeline_bound = False
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
Expand All @@ -2018,7 +2041,39 @@ async def get_room_sync_data(

log_kv({"sliding_sync.room_status": room_status})

log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
prev_room_sync_config = previous_connection_state.previous_room_configs.get(
room_id
)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change.
if (
prev_room_sync_config.timeline_limit
< room_sync_config.timeline_limit
):
ignore_timeline_bound = True
new_connection_state.previous_room_configs[room_id] = (
room_sync_config
)

if (
room_status.status != HaveSentRoomFlag.LIVE
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.previous_room_configs[room_id] = (
room_sync_config
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# TODO: Record changes in required_state.

log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.initial": initial,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
}
)

# Assemble the list of timeline events
#
Expand Down Expand Up @@ -2055,6 +2110,10 @@ async def get_room_sync_data(
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

timeline_from_bound = from_bound
if ignore_timeline_bound:
timeline_from_bound = None

# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
Expand All @@ -2080,7 +2139,7 @@ async def get_room_sync_data(
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
if timeline_from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
Expand All @@ -2090,7 +2149,7 @@ async def get_room_sync_data(
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
Expand Down Expand Up @@ -2448,6 +2507,47 @@ async def get_room_sync_data(
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream

prev_room_sync_config = previous_connection_state.previous_room_configs.get(
room_id
)
if ignore_timeline_bound:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `initial=true` *without* sending down all
# the state/metadata again, which is what the proxy does. We should
# update the protocol to do something less silly.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
initial = True

new_connection_state.previous_room_configs[room_id] = RoomSyncConfig(
timeline_limit=len(timeline_events),
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
required_state_map=room_sync_config.required_state_map,
)
elif prev_room_sync_config is not None:
# If the result isn't limited then we don't need to record that the
# timeline_limit has been reduced, as the *effective* timeline limit
# (i.e. the amount of timeline we have previously sent) is at least
# the previous timeline limit.
#
# This is to handle the case where the timeline limit e.g. goes from
# 10 to 5 to 10 again (without any timeline gaps), where there's no
# point sending down extra events when the timeline limit is
# increased as the client already has the 10 previous events.
# However, if is a gap (i.e. limited is True), then we *do* need to
# record the reduced timeline.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I just don't get it. Related to the comment above

I'm confused why we're even updating new_connection_state here given we set it above though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, looks like I moved the code and then just didn't remove it from the old place.

I think the example explains the motivating case we're trying to handle? Another way of putting it is that if the response isn't limited, then we know we have sent down at least the previous timeline_limit worth of events, and so we don't need to record the reduced timeline limit until we actually see a limited response

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking with @erikjohnston and some more thinking, I think I understand the optimization here.

The goal of timeline trickling/unstable_expanded_timeline is that the client can get at-least a chunk of current messages to fill up the screen. So if we were already able to give the client a nice contiguous chunk of current timeline, when they increase the timeline_limit again, we can tell that they already have enough messages and don't need to trigger unstable_expanded_timeline again.

If the response is limited, the client has a gap in the timeline. So if they trigger unstable_expanded_timeline on a subsequent request, we can give them a full chunk of history again. This won't necessarily fill the gap but they will get enough events to fill up the screen and they can paginate from there.

I'm not sure this explanation would have made it more clear from the beginning.

if (
limited
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.previous_room_configs[room_id] = RoomSyncConfig(
timeline_limit=len(timeline_events),
required_state_map=room_sync_config.required_state_map,
)

# TODO: Record changes in required_state.

else:
new_connection_state.previous_room_configs[room_id] = room_sync_config

set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)

return SlidingSyncResult.RoomResult(
Expand Down Expand Up @@ -3262,16 +3362,32 @@ class PerConnectionState:
Attributes:
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
previous_room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""

rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)

previous_room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)

def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
previous_room_configs = cast(
MutableMapping[str, RoomSyncConfig], self.previous_room_configs
)

return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
previous_room_configs=ChainMap({}, previous_room_configs),
)

def copy(self) -> "PerConnectionState":
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
previous_room_configs=dict(self.previous_room_configs),
)


Expand All @@ -3282,8 +3398,18 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]

previous_room_configs: typing.ChainMap[str, RoomSyncConfig]
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

def has_updates(self) -> bool:
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates())
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
)

def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.previous_room_configs.maps[0]


@attr.s(auto_attribs=True)
Expand Down Expand Up @@ -3367,7 +3493,6 @@ async def record_new_state(
) -> int:
"""Record updated per-connection state, returning the connection
position associated with the new state.

If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
Expand All @@ -3388,10 +3513,7 @@ async def record_new_state(

# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = PerConnectionState(
rooms=new_connection_state.rooms.copy(),
receipts=new_connection_state.receipts.copy(),
)
sync_statuses[new_store_token] = new_connection_state.copy()

return new_store_token

Expand Down
124 changes: 124 additions & 0 deletions tests/rest/client/sliding_sync/test_rooms_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import StreamToken, StrSequence
Expand Down Expand Up @@ -573,3 +574,126 @@ def test_rooms_ban_incremental_sync2(self) -> None:

# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"].get(room_id1))

def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}

message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])

# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]

self.assertEqual(room_response["initial"], True)
self.assertEqual(room_response["limited"], True)

# We only expect the last message at first
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events[-1:],
room_response["timeline"],
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# We also expect to get the create event state.
self.assertEqual(
[event["type"] for event in room_response["required_state"]],
[EventTypes.Create],
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]

self.assertEqual(room_response["initial"], True)
self.assertEqual(room_response["limited"], True)

# Now we expect all the messages
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events,
room_response["timeline"],
)

# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)

# Decreasing the timeline limit shouldn't resend any events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 5,
}
}

event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]

self.assertNotIn("initial", room_response)
self.assertEqual(room_response["limited"], False)

self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
[latest_event_id],
room_response["timeline"],
)

# Increasing the limit to what it was before also should not resend any
# events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}

event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]

self.assertNotIn("initial", room_response)
self.assertEqual(room_response["limited"], False)

self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
[latest_event_id],
room_response["timeline"],
)
Loading