Skip to content

Commit

Permalink
Sliding Sync: Handle timeline limit changes (take 2) (#17579)
Browse files Browse the repository at this point in the history
This supersedes #17503, given the per-connection state is being heavily
rewritten it felt easier to recreate the PR on top of that work.

This correctly handles the case of timeline limits going up and down.

This does not handle changes in `required_state`, but that can be done
as a separate PR.

Based on #17575.

---------

Co-authored-by: Eric Eastwood <[email protected]>
  • Loading branch information
erikjohnston and MadLittleMods committed Aug 20, 2024
1 parent 950ba84 commit 6eb98a4
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 13 deletions.
1 change: 1 addition & 0 deletions changelog.d/17579.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.
152 changes: 139 additions & 13 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,20 @@ async def current_sync_for_user(
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
for room_id, room_config in relevant_room_map.items():
prev_room_sync_config = previous_connection_state.room_configs.get(
room_id
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
# (see the "XXX: Odd behavior" described below)
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
):
rooms_should_send.add(room_id)
continue

status = previous_connection_state.rooms.have_sent_room(room_id)
if (
# The room was never sent down before so the client needs to know
Expand Down Expand Up @@ -819,12 +832,15 @@ async def current_sync_for_user(
if room_id in rooms_should_send
}

new_connection_state = previous_connection_state.get_mutable()

@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
room_id=room_id,
room_sync_config=relevant_rooms_to_send_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
Expand All @@ -842,8 +858,6 @@ async def handle_room(room_id: str) -> None:
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)

new_connection_state = previous_connection_state.get_mutable()

extensions = await self.get_extensions_response(
sync_config=sync_config,
actual_lists=lists,
Expand Down Expand Up @@ -1955,6 +1969,7 @@ async def get_room_sync_data(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
Expand Down Expand Up @@ -1998,9 +2013,27 @@ async def get_room_sync_data(
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
# Relevant spec issue:
# https://github.com/matrix-org/matrix-spec/issues/1917
#
# XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so
# we ignore the from bound for the timeline to send down a larger chunk of
# history and set `unstable_expanded_timeline` to true. This is only being added
# to match the behavior of the Sliding Sync proxy as we expect the ElementX
# client to feel a certain way and be able to trickle in a full page of timeline
# messages to fill up the screen. This is a bit different to the behavior of the
# Sliding Sync proxy (which sets initial=true, but then doesn't send down the
# full state again), but existing apps, e.g. ElementX, just need `limited` set.
# We don't explicitly set `limited` but this will be the case for any room that
# has more history than we're trying to pull out. Using
# `unstable_expanded_timeline` allows us to avoid contaminating what `initial`
# or `limited` mean for clients that interpret them correctly. In future this
# behavior is almost certainly going to change.
#
# TODO: Also handle changes to `required_state`
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
Expand All @@ -2018,7 +2051,26 @@ async def get_room_sync_data(

log_kv({"sliding_sync.room_status": room_status})

log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change (see "XXX: Odd behavior"
# above).
if (
prev_room_sync_config.timeline_limit
< room_sync_config.timeline_limit
):
ignore_timeline_bound = True

# TODO: Check for changes in `required_state``

log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.initial": initial,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
}
)

# Assemble the list of timeline events
#
Expand Down Expand Up @@ -2055,6 +2107,10 @@ async def get_room_sync_data(
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

timeline_from_bound = from_bound
if ignore_timeline_bound:
timeline_from_bound = None

# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
Expand All @@ -2080,7 +2136,7 @@ async def get_room_sync_data(
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
if timeline_from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
Expand All @@ -2090,7 +2146,7 @@ async def get_room_sync_data(
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
Expand Down Expand Up @@ -2448,6 +2504,55 @@ async def get_room_sync_data(
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream

unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
# that the `timeline_limit` has increased)
if ignore_timeline_bound:
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `unstable_expanded_timeline` to true (see
# "XXX: Odd behavior" above).
unstable_expanded_timeline = True

new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
elif prev_room_sync_config is not None:
# If the result is `limited` then we need to record that the
# `timeline_limit` has been reduced, as when/if the client later requests
# more timeline then we have more data to send.
#
# Otherwise (when not `limited`) we don't need to record that the
# `timeline_limit` has been reduced, as the *effective* `timeline_limit`
# (i.e. the amount of timeline we have previously sent to the client) is at
# least the previous `timeline_limit`.
#
# This is to handle the case where the `timeline_limit` e.g. goes from 10 to
# 5 to 10 again (without any timeline gaps), where there's no point sending
# down the initial historical chunk events when the `timeline_limit` is
# increased as the client already has the 10 previous events. However, if
# client has a gap in the timeline (i.e. `limited` is True), then we *do*
# need to record the reduced timeline.
#
# TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from
# the gaps we might see on the client because a response was `limited` we're
# talking about above.
if (
limited
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)

# TODO: Record changes in required_state.

else:
new_connection_state.room_configs[room_id] = room_sync_config

set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)

return SlidingSyncResult.RoomResult(
Expand All @@ -2462,6 +2567,7 @@ async def get_room_sync_data(
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
Expand Down Expand Up @@ -3264,16 +3370,30 @@ class PerConnectionState:
Attributes:
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""

rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)

room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)

def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)

return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
room_configs=ChainMap({}, room_configs),
)

def copy(self) -> "PerConnectionState":
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
room_configs=dict(self.room_configs),
)


Expand All @@ -3284,8 +3404,18 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]

room_configs: typing.ChainMap[str, RoomSyncConfig]

def has_updates(self) -> bool:
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates())
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
)

def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.room_configs.maps[0]


@attr.s(auto_attribs=True)
Expand Down Expand Up @@ -3369,7 +3499,6 @@ async def record_new_state(
) -> int:
"""Record updated per-connection state, returning the connection
position associated with the new state.
If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
Expand All @@ -3390,10 +3519,7 @@ async def record_new_state(

# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = PerConnectionState(
rooms=new_connection_state.rooms.copy(),
receipts=new_connection_state.receipts.copy(),
)
sync_statuses[new_store_token] = new_connection_state.copy()

return new_store_token

Expand Down
5 changes: 5 additions & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,11 @@ async def encode_rooms(
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial

if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline

# This will be omitted for invite/knock rooms with `stripped_state`
if (
room_result.required_state is not None
Expand Down
4 changes: 4 additions & 0 deletions synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class RoomResult:
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
Expand Down Expand Up @@ -219,6 +222,7 @@ class StrippedHero:
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`
Expand Down
Loading

0 comments on commit 6eb98a4

Please sign in to comment.