Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Handle timeline limit changes (take 2) #17579

Merged
merged 29 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
da5339d
Migrate to per-connection state class
erikjohnston Aug 12, 2024
baac6c5
Record with new class
erikjohnston Aug 14, 2024
0561c86
Revamp
erikjohnston Aug 15, 2024
c15b8b3
WIP receipts reading
erikjohnston Aug 13, 2024
a1b75f7
WIP comments
erikjohnston Aug 14, 2024
6b9d244
Record state
erikjohnston Aug 15, 2024
55feaae
Add tests
erikjohnston Aug 15, 2024
614c0d7
Newsfile
erikjohnston Aug 15, 2024
100927d
Comments
erikjohnston Aug 15, 2024
70d32fb
Add proper DB function for getting receipts between things
erikjohnston Aug 15, 2024
ee6efa2
Track room configs in per-connection state
erikjohnston Aug 16, 2024
009af0e
Handle timeline_limit changes
erikjohnston Aug 16, 2024
b23231e
Newsfile
erikjohnston Aug 16, 2024
aea946b
Merge remote-tracking branch 'origin/develop' into erikj/ss_room_sub2
erikjohnston Aug 19, 2024
33ec15b
Restore comments
erikjohnston Aug 19, 2024
768d150
Add docstring
erikjohnston Aug 19, 2024
a63261d
Restore comments
erikjohnston Aug 19, 2024
891ce47
Rename previous_room_configs
erikjohnston Aug 19, 2024
a4ad443
Use test helpers
erikjohnston Aug 19, 2024
0e8feed
Remove spurious set_tag
erikjohnston Aug 19, 2024
49c4645
Remove double insertion
erikjohnston Aug 19, 2024
299ab1b
Use timelime_limit not len(timeline)
erikjohnston Aug 19, 2024
ba4e63b
Add comment explaining the odd behaviour
erikjohnston Aug 19, 2024
2bba63e
Replace initial=true with unstable_expanded_timeline=true
erikjohnston Aug 19, 2024
52f4253
Improve comment
erikjohnston Aug 19, 2024
09538c2
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
733555b
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
76f882a
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
bcaf4e6
Update synapse/handlers/sliding_sync.py
erikjohnston Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17575.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly track read receipts that should be sent down in experimental sliding sync.
1 change: 1 addition & 0 deletions changelog.d/17579.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.
Copy link
Contributor

@MadLittleMods MadLittleMods Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continues to feel horrible especially given new edge cases like this comment. Highly recommend we just update the client to use an initial sync request with timeline_limit: 20 and required_state: [] (which allows us to avoid the extra bytes) to accomplish the exact same thing without introducing any of this bizarre behavior.

Previous conversation for context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with @erikjohnston a bit more and was trying to figure out why the initial sync request doesn't solve this completely. I basically asked the opposite question for how/why timeline trickling/unstable_expanded_timeline makes this easier. The following question unlocked a better understanding for one complication when trying to use the initial sync route.

How is the timeline stitching done for the timeline trickling? How is the problem easier when using that?

[...] its all coming down one connection, so you know you've got a consistent "current" timeline chunk that you'll get updates for (which you can then optionally stitch together with whatever timeline chunks the client currently has)

This is a valid point!

I'm still leaning towards the side of initial sync being possible to use (and better) and just requires some basic timeline stitching logic. ElementX might already have some stitching and event de-duplication logic to handle what the proxy was doing before that would also cover this case.

Since ElementX doesn't have offline support and throws away events, I think we could just do this:

For the timeline stitching logic, the client can store the latest event in timeline before our initial sync, then find that event in the initial sync timeline events and spread backwards from that point. That way, the ongoing sync loop can still append to the end of the timeline and continue seamlessly.

So if we have a timeline [103] already on the client, we store latest_event_id = 103, do our initial sync which returns [100, 101, 102, 103, 104] and we splice/spread in only [100, 101, 102, 103] accordingly (drop any new events after the latest_event_id from the initial sync response). This makes it so that even if the ongoing sync loop sends 104 before or after our initial sync does, it still appends like normal and everything is in seamless order.

If there are so many new messages sent in the time between us storing the latest_event_id and the initial sync responding that we now have a gap, we can just throw away our initial sync events because we have enough events to fill up the timeline just from our normal ongoing sync loop.


To be clear, the client doesn't need to be fancy about stitching:

If the client had more timeline like [98, 99, 100, 101, 102, 103], we store latest_event_id = 103, we start the initial sync, our ongoing sync loop races us and returns 104 which makes our timeline look like [98, 99, 100, 101, 102, 103, 104]. Then our initial sync responds with [100, 101, 102, 103, 104], we find the 103 spot in the response to slice at and place it at the 103 spot in the client timeline leaving us with [100, 101, 102, 103, 104]

Pseudo code (maybe off-by-one errors):

latest_event_id = 103

# do initial sync request

initial_sync_timeline = [100, 101, 102, 103, 104]
event_index_in_response = initial_sync_timeline.index(latest_event_id)
# Skip if we can't find the `latest_event_id` in the response.
# This means there have been so many messages sent between the time we initially
# made the initial sync and the response that this is no longer relevant.
# We already have enough events to fill up the timeline from the normal
# ongoing sync loop
if event_index_in_response is None:
	return

event_index_in_client_timeline = client_timeline.index(latest_event_id)
# Update the timeline
client_timeline = initial_sync_timeline[0:event_index_in_response] + client_timeline[event_index_in_client_timeline:-1]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need this same room sync config tracking for required_state (and probably filter/extension) changes so overall the concept isn't lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible edge case: ElementX triggers unstable_expanded_timeline by increasing the timeline_limit and gets a chunk of timeline and we record that higher timeline_limit. I go to bed and then I wake up (or just some period of time that new messages were sent in but I didn't have the app open). How does ElementX get timeline for all of the events in the gap? If it tries to trigger unstable_expanded_timeline again, it won't work because the last recorded timeline_limit is already just as high.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. There are two cases depending on if the timeline limit in the morning is small or high:

For the small case:

  • When the client comes online it'll get a small bit of history with limited=true.
  • The client can then either backpaginate in the room, or do a room sub with a larger timeline

For the high case:

  • When the client comes online it'll get a large chunk of history with limited=true. This is (hopefully) enough to show a screens worth of data, which is what we want.
  • If the client wants more it can just backpaginate.

What EX at least wants is to quickly be able to get enough chunks of history in rooms (in the background) to be able to show a screens worth of data. That way the UX is open the app, see a fast sync, (in the background it preloads the top N rooms with more timeline), the user clicks on one of the rooms and sees a page of timeline, and then the app can paginate in more timeline as usual (via /messages).

582 changes: 443 additions & 139 deletions synapse/handlers/sliding_sync.py

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
JsonMapping,
MultiWriterStreamToken,
PersistedPosition,
StrCollection,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -550,6 +552,45 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]:

return results

async def get_rooms_with_receipts_between(
self,
room_ids: StrCollection,
from_key: MultiWriterStreamToken,
to_key: MultiWriterStreamToken,
) -> StrCollection:
"""Get the set of rooms that (may) have receipts between the two tokens."""

room_ids = self._receipts_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
if not room_ids:
return []

def f(txn: LoggingTransaction, room_ids: StrCollection) -> StrCollection:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

sql = f"""
SELECT DISTINCT room_id FROM receipts_linearized
WHERE {clause} AND ? < stream_id AND stream_id <= ?
"""
args.append(from_key.stream)
args.append(to_key.get_max_stream_pos())

txn.execute(sql, args)

return [room_id for room_id, in txn]

results: List[str] = []
for batch in batch_iter(room_ids, 1000):
batch_result = await self.db_pool.runInteraction(
"get_rooms_with_receipts_between", f, batch
)
results.extend(batch_result)

return results

async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:
Expand Down
105 changes: 105 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,108 @@ def test_wait_for_new_data_timeout(self) -> None:
set(),
exact=True,
)

def test_receipts_incremental_sync_out_of_range(self) -> None:
"""Tests that we don't return read receipts for rooms that fall out of
range, but then do send all read receipts once they're back in range.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)

# Send a message and read receipt into room2
event_response = self.helper.send(room_id2, body="new event", tok=user2_tok)
room2_event_id = event_response["event_id"]

self.helper.send_read_receipt(room_id2, room2_event_id, tok=user1_tok)

# Now send a message into room1 so that it is at the top of the list
self.helper.send(room_id1, body="new event", tok=user2_tok)

# Make a SS request for only the top room.
sync_body = {
"lists": {
"main": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 5,
}
},
"extensions": {
"receipts": {
"enabled": True,
}
},
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)

# The receipt is in room2, but only room1 is returned, so we don't
# expect to get the receipt.
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
set(),
exact=True,
)

# Move room2 into range.
self.helper.send(room_id2, body="new event", tok=user2_tok)

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)

# We expect to see the read receipt of room2, as that has the most
# recent update.
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
{room_id2},
exact=True,
)
receipt = response_body["extensions"]["receipts"]["rooms"][room_id2]
self.assertIncludes(
receipt["content"][room2_event_id][ReceiptTypes.READ].keys(),
{user1_id},
exact=True,
)

# Send a message into room1 to bump it to the top, but also send a
# receipt in room2
self.helper.send(room_id1, body="new event", tok=user2_tok)
self.helper.send_read_receipt(room_id2, room2_event_id, tok=user2_tok)

# We don't expect to see the new read receipt.
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
set(),
exact=True,
)

# But if we send a new message into room2, we expect to get the missing receipts
self.helper.send(room_id2, body="new event", tok=user2_tok)

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
{room_id2},
exact=True,
)

# We should only see the new receipt
receipt = response_body["extensions"]["receipts"]["rooms"][room_id2]
self.assertIncludes(
receipt["content"][room2_event_id][ReceiptTypes.READ].keys(),
{user2_id},
exact=True,
)
6 changes: 3 additions & 3 deletions tests/rest/client/sliding_sync/test_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ def test_extensions_lists_rooms_relevant_rooms(
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
}
},
}
Expand Down
124 changes: 124 additions & 0 deletions tests/rest/client/sliding_sync/test_rooms_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import StreamToken, StrSequence
Expand Down Expand Up @@ -573,3 +574,126 @@ def test_rooms_ban_incremental_sync2(self) -> None:

# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"].get(room_id1))

def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}

message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])

# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]

self.assertEqual(room_response["initial"], True)
self.assertEqual(room_response["limited"], True)

# We only expect the last message at first
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events[-1:],
room_response["timeline"],
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# We also expect to get the create event state.
self.assertEqual(
[event["type"] for event in room_response["required_state"]],
[EventTypes.Create],
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]

self.assertEqual(room_response["initial"], True)
self.assertEqual(room_response["limited"], True)

# Now we expect all the messages
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events,
room_response["timeline"],
)

# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)

# Decreasing the timeline limit shouldn't resend any events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 5,
}
}

event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]

self.assertNotIn("initial", room_response)
self.assertEqual(room_response["limited"], False)

self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
[latest_event_id],
room_response["timeline"],
)

# Increasing the limit to what it was before also should not resend any
# events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}

event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]

response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]

self.assertNotIn("initial", room_response)
self.assertEqual(room_response["limited"], False)

self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
[latest_event_id],
room_response["timeline"],
)
14 changes: 13 additions & 1 deletion tests/rest/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from twisted.test.proto_helpers import MemoryReactorClock
from twisted.web.server import Site

from synapse.api.constants import Membership
from synapse.api.constants import Membership, ReceiptTypes
from synapse.api.errors import Codes
from synapse.server import HomeServer
from synapse.types import JsonDict
Expand Down Expand Up @@ -944,3 +944,15 @@ def initiate_sso_ui_auth(
assert len(p.links) == 1, "not exactly one link in confirmation page"
oauth_uri = p.links[0]
return oauth_uri

def send_read_receipt(self, room_id: str, event_id: str, *, tok: str) -> None:
"""Send a read receipt into the room at the given event"""
channel = make_request(
self.reactor,
self.site,
method="POST",
path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}",
content={},
access_token=tok,
)
assert channel.code == HTTPStatus.OK, channel.text_body
Loading