Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Mark sync as limited if there is a gap in the timeline #16485

Merged
merged 7 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16485.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix long-standing bug where `/sync` incorrectly did not mark a room as `limited` in a sync requests when there were missing remote events.
52 changes: 44 additions & 8 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,27 @@ async def ephemeral_by_room(
async def _load_filtered_recents(
self,
room_id: str,
sync_result_builder: "SyncResultBuilder",
sync_config: SyncConfig,
now_token: StreamToken,
upto_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
"""Create a timeline batch for the room

Args:
room_id
sync_result_builder
sync_config
upto_token: The token up to which we should fetch (more) events.
If `potential_results` is non-empty then this is *start* of
the the list.
since_token
potential_recents: If non-empty, the events between the since token
and current token to send down to clients.
newly_joined_room
"""
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
Expand All @@ -543,6 +558,20 @@ async def _load_filtered_recents(
else:
limited = False

# Check if there is a gap, if so we need to mark this as limited and
# recalculate which events to send down.
gap_token = await self.store.get_timeline_gaps(
room_id,
since_token.room_key if since_token else None,
sync_result_builder.now_token.room_key,
)
if gap_token:
# There's a gap, so we need to ignore the passed in
# `potential_recents`, and reset `upto_token` to match.
potential_recents = None
upto_token = sync_result_builder.now_token
limited = True

log_kv({"limited": limited})

if potential_recents:
Expand Down Expand Up @@ -581,10 +610,10 @@ async def _load_filtered_recents(
recents = []

if not limited or block_all_timeline:
prev_batch_token = now_token
prev_batch_token = upto_token
if recents:
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(
prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)

Expand All @@ -595,11 +624,15 @@ async def _load_filtered_recents(
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
max_repeat = 5 # Only try a few times per room, otherwise
room_key = now_token.room_key
room_key = upto_token.room_key
end_key = room_key

since_key = None
if since_token and not newly_joined_room:
if since_token and gap_token:
# If there is a gap then we need to only include events after
# it.
Comment on lines +632 to +633
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# If there is a gap then we need to only include events after
# it.
# If there is a gap then we need to include only events after
# it.

Maybe? I'm not sure why we would do this though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a section of the timeline with events A, B, <gap>, X, Y, Z we need to make sure we only send down X, Y, Z and not A and B (which we currently do. If we did, then the client wouldn't try and paginate to fill in the gap.

since_key = gap_token
elif since_token and not newly_joined_room:
since_key = since_token.room_key

while limited and len(recents) < timeline_limit and max_repeat:
Expand Down Expand Up @@ -669,7 +702,7 @@ async def _load_filtered_recents(
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before

prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)

# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
Expand All @@ -684,7 +717,9 @@ async def _load_filtered_recents(
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
limited=limited or newly_joined_room,
# Also mark as limited if this is a new room or there has been a gap
# (to force client to paginate the gap).
limited=limited or newly_joined_room or gap_token is not None,
clokep marked this conversation as resolved.
Show resolved Hide resolved
bundled_aggregations=bundled_aggregations,
)

Expand Down Expand Up @@ -2419,8 +2454,9 @@ async def _generate_room_entry(

batch = await self._load_filtered_recents(
room_id,
sync_result_builder,
sync_config,
now_token=upto_token,
upto_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
Expand Down
74 changes: 49 additions & 25 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2267,35 +2267,59 @@ def _update_backward_extremeties(

Forward extremities are handled when we first start persisting the events.
"""
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
# 1. Don't add an event as a extremity again if we already persisted it
# as a non-outlier.
# 2. Don't add an outlier as an extremity if it has no prev_events
" AND NOT EXISTS ("
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
" WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
" )"

room_id = events[0].room_id
clokep marked this conversation as resolved.
Show resolved Hide resolved

potential_backwards_extremities = {
e_id
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
}

if not potential_backwards_extremities:
return

existing_events_outliers = self.db_pool.simple_select_many_txn(
txn,
table="events",
column="event_id",
iterable=potential_backwards_extremities,
keyvalues={"outlier": False},
retcols=("event_id",),
)

txn.execute_batch(
query,
[
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
],
potential_backwards_extremities.difference_update(
e for e, in existing_events_outliers
)

if potential_backwards_extremities:
self.db_pool.simple_upsert_many_txn(
txn,
table="event_backward_extremities",
key_names=("room_id", "event_id"),
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
value_names=(),
value_values=(),
)

# Record the stream orderings where we have new gaps.
gap_events = [
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
for ev in events
if any(
e_id in potential_backwards_extremities
for e_id in ev.prev_event_ids()
)
]
clokep marked this conversation as resolved.
Show resolved Hide resolved

self.db_pool.simple_insert_many_txn(
txn,
table="timeline_gaps",
keys=("room_id", "instance_name", "stream_ordering"),
values=gap_events,
)

# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
Expand Down
47 changes: 47 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,3 +1616,50 @@ async def get_name_from_instance_id(self, instance_id: int) -> str:
retcol="instance_name",
desc="get_name_from_instance_id",
)

async def get_timeline_gaps(
self,
room_id: str,
from_token: Optional[RoomStreamToken],
to_token: RoomStreamToken,
) -> Optional[RoomStreamToken]:
"""Check if there is a gap, and return a token that marks the position
of the gap in the stream.
"""

sql = """
SELECT instance_name, stream_ordering
FROM timeline_gaps
WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
ORDER BY stream_ordering
"""

rows = await self.db_pool.execute(
"get_timeline_gaps",
None,
sql,
room_id,
from_token.stream if from_token else 0,
to_token.get_max_stream_pos(),
)

if not rows:
return None

positions = [
PersistedEventPosition(instance_name, stream_ordering)
for instance_name, stream_ordering in rows
]
if from_token:
positions = [p for p in positions if p.persisted_after(from_token)]

positions = [p for p in positions if not p.persisted_after(to_token)]

if positions:
# We return a stream token that ensures the event *at* the position
# of the gap is included (as the gap is *before* the persisted
# event).
last_position = positions[-1]
return RoomStreamToken(stream=last_position.stream - 1)

return None
25 changes: 25 additions & 0 deletions synapse/storage/schema/main/delta/82/05gaps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Records when we see a "gap in the timeline", due to missing events over
-- federation. We record this so that we can tell clients there is a gap (by
-- marking the timeline section of a sync request as limited).
CREATE TABLE IF NOT EXISTS timeline_gaps (
room_id TEXT NOT NULL,
instance_name TEXT NOT NULL,
stream_ordering BIGINT NOT NULL
);

CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);
Loading