Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix missing sync events during historical batch imports #12319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12319.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
14 changes: 5 additions & 9 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,13 @@ async def get_state_events(
state_filter = state_filter or StateFilter.all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is how this changes clients back paginating using a prev_batch token from a /sync timeline section. Those tokens are currently stream tokens, but /messages will return them in topological order with a topological next token. This change means that the first /messages will return events in stream order but will still return a topological next token, meaning that the next /messages will return events in a topological order.

-- @erikjohnston, #12319 (comment)

/messages could be updated to always return in topological_ordering regardless of what type of pagination token is given. That's what it was doing before this PR anyway.

I don't think that is a change we really want to make. My understanding is that that behaviour works correctly currently? Or does that also have issues with backfilled/historical events?

I think we have two assumptions that we should probably enforce:

  • /messages should always sort by topological_ordering regardless of pagination token given
  • /sync should always sort by stream_ordering regardless of pagination token given

This also aligns with what we have documented:

  • /sync returns things in the order they arrive at the server (stream_ordering).
  • /messages (and /backfill in the federation API) return them in the order determined by the event graph (topological_ordering, stream_ordering).

-- Room DAG concepts -> Depth and stream ordering

The current behavior of /messages and /sync returns events as expected but not state.

There is a bug in how incremental /sync returns state which this PR aims to fix. The best way to understand the problem is probably reading #12281 (comment) - but basically when calculating what state to return, /sync is ordering events by topological_ordering even though it should be stream_ordering.


I initially assumed we would want flexibility in how these endpoints sorted events according to the type of pagination token given but it's sounding like we actually want to enforce a given sort according to the endpoint. In which case, we can revert to @Fizzadar initial approach of plumbing a order_by argument which we can set.

Copy link
Member

@erikjohnston erikjohnston Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry for not getting to this yesterday, it sat at about the second thing on my todo list all day :/)

Thanks for the clarification. There's the added bonus that /sync will return events in pagination order for the first batch of events for the room, i.e. in an initial sync or when a user joins the room. (The rationale being that it's equivalent to getting no events then immediately doing a back pagination). This is relevant as we use get_recent_events_for_room for that case as well.

I initially assumed we would want flexibility in how these endpoints sorted events according to the type of pagination token given but it's sounding like we actually want to enforce a given sort according to the endpoint. In which case, we can revert to @Fizzadar initial approach of plumbing a order_by argument which we can set.

I wonder if we're just making this harder for ourselves by re-using the same query to do both /messages and when looking up the state for /sync? I think the query we want to run is simply:

SELECT event_id FROM events
WHERE room_id = ? AND stream_ordering <= ?
ORDER BY stream_ordering DESC
LIMIT 1

So it might be simpler to just have a separate get_last_event_in_room_before function that we call when getting the state instead? Rather than extending the already slightly horrific _paginate_room_events_txn function?


Entirely separately, and I'm not suggesting we necessarily do this now, but it occurs to me that we have a get_forward_extremities_for_room_at_stream_ordering store function that maps from room_id to the list of forward extremities in the room at the time. This can then be used to calculate the "current" state of the room at the time more accurately than the current method. The downside of this approach is that get_forward_extremities_for_room_at_stream_ordering will fail if the stream ordering is too old.

Another approach that has also literally just now occurred to me is that we could use the current_state_delta_stream to work out the changes in state between two stream orderings in the room, as the stream_id column correspond to the stream orderings of the changes (I think?).

Anyway, just wanted to record my thoughts here before I forget them

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fizzadar sorry for going around the houses really slowly on this 😞

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries at all @erikjohnston - totally agree a separate function makes sense here; I've pushed that up in 9a78d14. I just wrapped an existing function get_room_event_before_stream_ordering (which doesn't actually return an event) and pulled out the event.

Have undone the pagination ordering changes too!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Will have a look after I've finished doing the v1.57.0rc1 release ❤️


if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = await self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1
last_event = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=at_token.room_key,
)

if not last_events:
if not last_event:
raise NotFoundError("Can't find event for token %s" % (at_token,))
last_event = last_events[0]

# check whether the user is in the room at that time to determine
# whether they should be treated as peeking.
Expand All @@ -204,7 +200,7 @@ async def get_state_events(
visible_events = await filter_events_for_client(
self.storage,
user_id,
last_events,
[last_event],
filter_send_to_client=False,
is_peeking=is_peeking,
)
Expand Down
15 changes: 7 additions & 8 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,16 +661,15 @@ async def get_state_at(
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
"""
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = await self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event = await self.store.get_last_event_in_room_before_stream_ordering(
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
room_id,
end_token=stream_position.room_key,
)
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test for #3305 so it doesn't regress.

I don't have clear reproduction steps for this one and maybe we can just consider the test we write for #12281 as a subset of this problem and only need one test 🤷


if last_events:
last_event = last_events[-1]
if last_event:
state = await self.get_state_after_event(
last_event, state_filter=state_filter or StateFilter.all()
)
Expand Down
26 changes: 26 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,32 @@ def _f(txn):
"get_room_event_before_stream_ordering", _f
)

async def get_last_event_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
) -> Optional[EventBase]:
"""Returns the last event in a room at or before a stream ordering

Args:
room_id
end_token: The token used to stream from

Returns:
The most recent event.
"""

last_row = await self.get_room_event_before_stream_ordering(
room_id=room_id,
stream_ordering=end_token.stream,
)
if last_row:
_, _, event_id = last_row
event = await self.get_event(event_id, get_prev_content=True)
return event

return None

async def get_current_room_stream_token_for_room_id(
self, room_id: Optional[str] = None
) -> RoomStreamToken:
Expand Down
125 changes: 123 additions & 2 deletions tests/rest/client/test_room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from synapse.api.constants import EventContentFields, EventTypes
from synapse.appservice import ApplicationService
from synapse.rest import admin
from synapse.rest.client import login, register, room, room_batch
from synapse.rest.client import login, register, room, room_batch, sync
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.types import JsonDict, RoomStreamToken
from synapse.util import Clock

from tests import unittest
Expand Down Expand Up @@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
room.register_servlets,
register.register_servlets,
login.register_servlets,
sync.register_servlets,
]

def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
Expand Down Expand Up @@ -178,3 +179,123 @@ def test_same_state_groups_for_whole_historical_batch(self) -> None:
"Expected a single state_group to be returned by saw state_groups=%s"
% (state_group_map.keys(),),
)

@unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
def test_sync_while_batch_importing(self) -> None:
"""
Make sure that /sync correctly returns full room state when a user joins
during ongoing batch backfilling.
See: https://github.com/matrix-org/synapse/issues/12281
"""
# Create user who will be invited & join room
user_id = self.register_user("beep", "test")
user_tok = self.login("beep", "test")

time_before_room = int(self.clock.time_msec())

# Create a room with some events
room_id, _, _, _ = self._create_test_room()
# Invite the user
self.helper.invite(
room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
)

# Create another room, send a bunch of events to advance the stream token
other_room_id = self.helper.create_room_as(
self.appservice.sender, tok=self.appservice.token
)
for _ in range(5):
self.helper.send_event(
room_id=other_room_id,
type=EventTypes.Message,
content={"msgtype": "m.text", "body": "C"},
tok=self.appservice.token,
)

# Join the room as the normal user
self.helper.join(room_id, user_id, tok=user_tok)

# Create an event to hang the historical batch from - In order to see
# the failure case originally reported in #12281, the historical batch
# must be hung from the most recent event in the room so the base
# insertion event ends up with the highest `topogological_ordering`
# (`depth`) in the room but will have a negative `stream_ordering`
# because it's a `historical` event. Previously, when assembling the
# `state` for the `/sync` response, the bugged logic would sort by
# `topological_ordering` descending and pick up the base insertion
# event because it has a negative `stream_ordering` below the given
# pagination token. Now we properly sort by `stream_ordering`
# descending which puts `historical` events with a negative
# `stream_ordering` way at the bottom and aren't selected as expected.
response = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "C",
},
tok=self.appservice.token,
)
event_to_hang_id = response["event_id"]
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved

channel = self.make_request(
"POST",
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
% (room_id, event_to_hang_id),
content={
"events": _create_message_events_for_batch_send_request(
self.virtual_user_id, time_before_room, 3
),
"state_events_at_start": _create_join_state_events_for_batch_send_request(
[self.virtual_user_id], time_before_room
),
},
access_token=self.appservice.token,
)
self.assertEqual(channel.code, 200, channel.result)

# Now we need to find the invite + join events stream tokens so we can sync between
main_store = self.hs.get_datastores().main
events, next_key = self.get_success(
main_store.get_recent_events_for_room(
room_id,
50,
end_token=main_store.get_room_max_token(),
),
)
invite_event_position = None
for event in events:
if (
event.type == "m.room.member"
and event.content["membership"] == "invite"
):
invite_event_position = self.get_success(
main_store.get_topological_token_for_event(event.event_id)
)
break

assert invite_event_position is not None, "No invite event found"

# Remove the topological order from the token by re-creating w/stream only
invite_event_position = RoomStreamToken(None, invite_event_position.stream)

# Sync everything after this token
since_token = self.get_success(invite_event_position.to_string(main_store))
sync_response = self.make_request(
"GET",
f"/sync?since={since_token}",
access_token=user_tok,
)

# Assert that, for this room, the user was considered to have joined and thus
# receives the full state history
state_event_types = [
event["type"]
for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
"events"
]
]

assert (
"m.room.create" in state_event_types
), "Missing room full state in sync response"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested and fails on develop but passes with this PR ✅ (as expected)