Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Handle room_subscriptions that increase the timeline_limit #17503

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17503.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle requests for more events in a room when using experimental sliding sync.
87 changes: 69 additions & 18 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,13 @@ async def current_sync_for_user(
else:
assert_never(status.status)

if status.timeline_limit is not None and (
status.timeline_limit < relevant_room_map[room_id].timeline_limit
):
# If the timeline limit has increased we want to send down
# more historic events (even if nothing has since changed).
rooms_should_send.add(room_id)

# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
Expand Down Expand Up @@ -697,6 +704,7 @@ async def handle_room(room_id: str) -> None:
if has_lists or has_room_subscriptions:
connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
room_configs=relevant_room_map,
from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
Expand Down Expand Up @@ -1475,14 +1483,20 @@ async def get_room_sync_data(
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
#
# We also decide if we should ignore the timeline bound or not. This is
# to handle the case where the client has requested more historical
# messages in the room by increasing the timeline limit.
from_bound = None
ignore_timeline_bound = False
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
sync_config=sync_config,
connection_token=from_token.connection_position,
room_id=room_id,
)

if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
Expand All @@ -1496,9 +1510,24 @@ async def get_room_sync_data(
else:
assert_never(room_status.status)

if room_status.timeline_limit is not None and (
room_status.timeline_limit < room_sync_config.timeline_limit
):
# If the timeline limit has been increased since previous
# requests then we treat it as request for more events. We do
# this by sending down a `limited` sync, ignoring the from
# bound.
ignore_timeline_bound = True
Comment on lines +1513 to +1520
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case we thought of during our weekly meeting, is that it seems like it's only possible to bump the timeline_limit once to get a batch of timeline messages. So if you initially request everything with timeline_limit: 1 and then timeline_limit: 20 to get a bunch of timeline messages, requesting with timeline_limit: 20 again won't give you a batch of timeline again. You would have to request timeline_limit: 21, etc into infinity every time you want to fetch the initial batch of timeline.

This comes into play even with ElementX because it grows and shrinks it's range over time and will want to fetch timeline for rooms that comes into view again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually covered in the PR, and I need to add tests, but: we when the timeline limit is reduced we update the timeline limit we stored, so we do correctly handle increase -> decrease -> increase.


log_kv({"sliding_sync.room_status": room_status})

log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
"sliding_sync.initial": initial,
}
)

# Assemble the list of timeline events
#
Expand Down Expand Up @@ -1541,7 +1570,7 @@ async def get_room_sync_data(
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=from_bound if not ignore_timeline_bound else None,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
Expand All @@ -1566,6 +1595,12 @@ async def get_room_sync_data(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)

if ignore_timeline_bound:
# If we're ignoring the timeline bound we *must* set limited to
# true, as otherwise the client will append the received events
# to the timeline, rather than replacing it.
limited = True
Comment on lines +1599 to +1602
Copy link
Contributor

@MadLittleMods MadLittleMods Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems dubious. If some client replaces everything on limited = True, I don't think that's the case for all clients.

For example, for an incremental sync if there are more events than the timeline_limit, it will be limited = True. But that just means there is a gap they should paginate and fill in with /messages.

limited doesn't mean the timeline should be replaced.

Even initial = True doesn't mean the timeline should be replaced.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is sort of an abuse of what limited means in sync v2, though generally the way clients deal with limited is to remove all existing messages from the timeline and only display the the new events (as clients don't generally render a "gap" between two timeline chunks).

What the SS proxy does is to set initial true, but not include unchanged required_state, which feels weird.

As I see it the options to deal with timeline trickling (in the short term) are:

  1. Set initial: true and include the full room data (including state)
  2. Do what the proxy does and set initial: true, but don't send down unchanged state
  3. Do what this PR does and set limited: true and don't send down unchanged state

I think the last one is the least worst. It's also worth noting that the client (mostly) opts into this behaviour by doing explicit room subscriptions with a differing timeline_limit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Side note: if we do do this PR we should update the docs for limited)

Copy link
Contributor

@MadLittleMods MadLittleMods Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern laid out is to use /messages. With the current Sliding Sync API, adjusting/repurposing fields/behaviors to mean other things is just trying to skirt the problem and adds tenuous intricacy to complexity.

If they're trying to get more timeline, one solution could be to do another initial sync with the timeline_limit that they want.

Clients should handle gaps though. Or at-least we shouldn't punish clients who do handle gaps.

For example, Hydrogen handles this with fragments and gap filling since it has offline support and doesn't throw away its work.

If we still think Sliding Sync should somehow address this, then I think we need to apply more thought to the API design.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear: this is implementing the behaviour that the sliding sync proxy already supports.

The way that the rust sdk currently starts is to:

  1. Do an initial request for top 20 rooms and timeline limit of 1. This gets a page of results fast.
  2. Do a second request for top 20 rooms (via room subs) with timeline of 20. This preloads the room lists for the top rooms
  3. Expand the range request-by-request, with timeline limit of 1.

I agree that there is something a bit confusing with allowing the client to change timeline limit, but TBF they are opting into this behaviour (to an extent).

The pattern laid out is to use /messages. With the current Sliding Sync API, adjusting/repurposing fields/behaviors to mean other things is just trying to skirt the problem and adds tenuous intricacy to complexity.

We would be tweaking limited to mean "these are the most recent N events and there may be a gap, if you've changed the timeline limit you may have already seen some of these events".

The alternative is to just use initial, which really does mean "throw away everything you have".

Clients should handle gaps though. Or at-least we shouldn't punish clients who do handle gaps.

For example, Hydrogen handles this with fragments and gap filling since it has offline support and doesn't throw away its work.

I don't think this really punishes clients that handle gaps that much, it means that if they change the timeline limit they need to handle the fact that they may have already seen a subset of the events that have been returned.

If we do just use initial, then it's up to the clients whether they want to just throw everything they have away or just use /messages

If we still think Sliding Sync should somehow address this, then I think we need to apply more thought to the API design.

We're not in a great position to change thing around right now, though I agree it should go on the list of things we should look into once we've reimplemented everything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a second initial sync requires changing the app, which we can probably do but really right now I want us to focus on getting parity with the sliding sync proxy.

Given an initial sync would cover the use case in an idiomatic way, I don't think we should introduce this convoluted behavior.

This is an active regression in behaviour and UX from the SS proxy. It's clear we need to do something here to allow the apps to continue to operate well.

To be clear, using limited: true is the wrong option here. It throws a wrench in what an initial vs incremental sync mean.

limited could be true in a legitimate scenario of too many messages but now we're also saying it can be true (along with a batch of timeline events) in a new random scenario of expanding timeline_limit which makes the downstream processing need to worry about how the request asked for things now.

Yes, we're subtly changing the meaning of limited here, though I disagree that it is that confusing. I did try and go through the experiment about how this works for offline-first clients, and the semantics seem to fit perfectly? The limited flag essentially becomes a flag as to whether the new chunk of events can be appended to the last timeline chunk, or whether it needs to be considered/handled as a new chunk of events (using the same logic they already have).

Additionally, the "opt-in" behavior you're talking about is also just another normal Sliding Sync request. Adjusting the ranges and timeline_limit is a normal thing to do that we don't need to introduce a foot-gun into.

It's really unclear to me what the semantics of increasing the timeline limit should be. Explicitly increasing the timeline limit very much feels like the app wants to get a bigger chunk of timeline down 🤷

I guess there is a time where this sort of happens implicitly, where we have two lists with different timeline limits, e.g. an "all-room" list with a limit of 1 and a "top-20" list with a limit of 20. We'd want to be careful to handle that sanely, though if an old room gets an update (which bumps it to the top-20 list) I don't think its insane for it to include the last 20 timeline events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to summarise my understanding here:

  • the proxy has a bug whereby timeline trickling (increasing the timeline limit for a room the client already knows about) sets initial: true when returning the larger timeline, but does NOT send required_state, which breaks the description of initial.
  • Erik is proposing to set limited: true instead as a better descriptor.
  • Eric thinks this is a footgun as it convolutes handling of gappy syncs, and counter-proposes sending required_state like the proxy should have technically been doing.
  • Erik is worried this is going to impact performance due to the increase in bytes-over-the-wire.

My opinion is that timeline trickling is awful and should be replaced with a bulk /messages endpoint. /sync is confusing enough to get right without having to think about historical state/messages. Far better to treat this endpoint as purely for real-time bare essential information, and defer to other APIs for more data. Some early sketches of other APIs (as well as use cases for the bulk endpoint) can be found in this document which is over 1 year old.

However, we do not live in this future. We live in the here and now, where we want to get SSS landed in Synapse ASAP and then iterate on any warts such as this. As a result, I don't really care which proposal we go with.

If neither of you can agree on the semantics, then might I suggest a tiebreak and literally just implement the behaviour of the proxy (that is initial without sending required_state iff the timeline_limit is changed for the room). That will definitely work with real existing clients (no client changes needed, no additional bandwidth increase) since the proxy is the only complete impl of SS. That feels no less warty than the other 2 suggested proposals here.

Failing that, I would lean towards Erik's limited: true on the basis that it is the most pragmatic solution here which won't negatively affect performance: which is literally the entire point of this API.

Copy link
Contributor

@MadLittleMods MadLittleMods Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really nice to see the candid agreement on how bizarre this behavior is and plans for a better future!

⭐ My actual proposal is that the client can use an initial sync request and ask for timeline_limit: 20 and required_state: [] which means barely any extra bytes over the wire (just the small meta data like room name). We can abuse the sliding sync endpoint to bulk fetch messages in a room and we don't need to introduce any of this convoluted behavior even in the interim.

Copy link
Contributor

@MadLittleMods MadLittleMods Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the weekly meeting, it seems like the only downside to my suggestion is that it requires client-side changes. The ElementX/Rust client team is out today so we aren't able to see whether this is a big deal or not.

Besides my suggestion, the better of the proposed options if we really want to push this through: use initial: true without sending required_state (match the proxy). This option optimizes for no client changes necessary which is the only reason to not use my suggestion, and is at-least some sort of approximation of what initial: true actually means which seems better than messing with the semantics of limited. While I disagree with moving forward with this, it is amenable to more forward so long as this doesn't suffer from the bump once problem.


Besides the existing things already discussed in this discussion thread, we also went over the possibility of adding a completely new flag to accurately describe and trigger this behavior or even a new field like initial_timeline that shows up whenever the timeline_limit is bumped. It's just more ugly workarounds that require client-side changes anyway but it's something that doesn't mess with what we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversation continued in #17579 (comment)

Copy link
Contributor

@MadLittleMods MadLittleMods Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't the client expanding their room timeline by paginating /messages using the prev_batch token given in the response?

That seems just as good if they're adding room subscriptions just to get more timeline events.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so this is really a (bit) of an abuse of the SS API so the client can avoid doing e.g. 20 pagination requests simultaneously.

Comment on lines +1598 to +1602
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case this doesn't handle is when there are only a few events in the room total. This will claim limited = True even if the timeline_limit fetches all events in the room.


# Make sure we don't expose any events that the client shouldn't see
timeline_events = await filter_events_for_client(
self.storage_controllers,
Expand Down Expand Up @@ -2232,19 +2267,26 @@ class HaveSentRoom:
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
timeline_limit: The timeline limit config for the room, if LIVE or
PREVIOUSLY. This is used to track if the client has increased
the timeline limit to request more events.
"""

status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
timeline_limit: Optional[int]

@staticmethod
def live(timeline_limit: int) -> "HaveSentRoom":
return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)

@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)


HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)


@attr.s(auto_attribs=True)
Expand Down Expand Up @@ -2299,6 +2341,7 @@ async def have_sent_room(
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
room_configs: Dict[str, RoomSyncConfig],
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
Expand Down Expand Up @@ -2339,8 +2382,12 @@ async def record_rooms(
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
prev_state = new_room_statuses.get(room_id)
new_room_statuses[room_id] = HaveSentRoom.live(
room_configs[room_id].timeline_limit
)
if prev_state != new_room_statuses[room_id]:
have_updated = True

# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
Expand All @@ -2351,18 +2398,22 @@ async def record_rooms(
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
#
# We only need to do this if `from_token` is not None, as if it is then
# we know that there are no existing entires.

# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER

for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if (
prev_state is not None
and prev_state.status == HaveSentRoomFlag.LIVE
):
new_room_statuses[room_id] = HaveSentRoom.previously(
from_token.stream_token.room_key,
room_configs[room_id].timeline_limit,
)
have_updated = True

if not have_updated:
return prev_connection_token
Expand Down
117 changes: 106 additions & 11 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
)
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
from synapse.handlers.sliding_sync import StateValues
from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues
from synapse.http.servlet import validate_json_object
from synapse.rest.client import (
devices,
knock,
Expand All @@ -53,13 +54,15 @@
from synapse.server import HomeServer
from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StreamKeyType,
StreamToken,
UserID,
)
from synapse.types.handlers import SlidingSyncConfig
from synapse.types.rest.client import SlidingSyncBody
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -1357,6 +1360,22 @@ async def _on_new_acivity(
"Expected `notifier.wait_for_events(...)` to be triggered"
)

def make_sync_config(
self, user: UserID, requester: Requester, content: JsonDict
) -> SlidingSyncConfig:
"""Helper function to turn a dict sync body to a sync config"""
body = validate_json_object(content, SlidingSyncBody)

sync_config = SlidingSyncConfig(
user=user,
requester=requester,
conn_id=body.conn_id,
lists=body.lists,
room_subscriptions=body.room_subscriptions,
extensions=body.extensions,
)
return sync_config


class SlidingSyncTestCase(SlidingSyncBase):
"""
Expand Down Expand Up @@ -4538,7 +4557,6 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None
self.helper.send(room_id1, "msg", tok=user1_tok)

timeline_limit = 5
conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
Expand Down Expand Up @@ -4584,19 +4602,22 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
user=requester.user,
requester=requester,
conn_id=conn_id,
sync_config = self.make_sync_config(
user=requester.user, requester=requester, content=sync_body
)

parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
assert sync_config.lists
room_configs = {
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
}
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
room_configs=room_configs,
from_token=parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
Expand Down Expand Up @@ -4646,7 +4667,6 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:

self.helper.send(room_id1, "msg", tok=user1_tok)

conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
Expand Down Expand Up @@ -4693,19 +4713,24 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
sync_config = self.make_sync_config(
user=requester.user,
requester=requester,
conn_id=conn_id,
content=sync_body,
)

parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
assert sync_config.lists
room_configs = {
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
}
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
room_configs=room_configs,
from_token=parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
Expand Down Expand Up @@ -4915,6 +4940,76 @@ def test_empty_initial_room_comes_down_sync(self) -> None:
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)

def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}

message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])

# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]

self.assertEqual(room_response["initial"], True)
self.assertEqual(room_response["limited"], True)

# We only expect the last message at first
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events[-1:],
room_response["timeline"],
)

# We also expect to get the create event state.
self.assertEqual(
[event["type"] for event in room_response["required_state"]],
[EventTypes.Create],
)

# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}

response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
room_response = response_body["rooms"][room_id1]

self.assertNotIn("initial", room_response)
self.assertEqual(room_response["limited"], True)

# Now we expect all the messages
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events,
room_response["timeline"],
)

# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)


class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension"""
Expand Down
Loading