Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add and use get_last_event_in_room_before_stream_ordering method
Browse files Browse the repository at this point in the history
This uses an existing method, `get_last_event_in_room_before_stream_ordering`,
to retrieve the most recent (stream ordered) event in a room and replaces
uses of the complex pagination txn.
  • Loading branch information
Fizzadar committed Apr 12, 2022
1 parent b2c6c20 commit 9a78d14
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
10 changes: 5 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ async def get_state_events(
state_filter = state_filter or StateFilter.all()

if at_token:
last_events, _ = await self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1
last_event = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=at_token.room_key,
)

if not last_events:
if not last_event:
raise NotFoundError("Can't find event for token %s" % (at_token,))
last_event = last_events[0]

# check whether the user is in the room at that time to determine
# whether they should be treated as peeking.
Expand All @@ -200,7 +200,7 @@ async def get_state_events(
visible_events = await filter_events_for_client(
self.storage,
user_id,
last_events,
[last_event],
filter_send_to_client=False,
is_peeking=is_peeking,
)
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,12 @@ async def get_state_at(
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
"""
last_events, _ = await self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1
last_event = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)

if last_events:
last_event = last_events[-1]
if last_event:
state = await self.get_state_after_event(
last_event, state_filter=state_filter or StateFilter.all()
)
Expand Down
26 changes: 26 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,32 @@ def _f(txn):
"get_room_event_before_stream_ordering", _f
)

async def get_last_event_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
) -> Optional[EventBase]:
"""Returns the last event in a room at or before a stream ordering
Args:
room_id
end_token: The token used to stream from
Returns:
The most recent event.
"""

last_row = await self.get_room_event_before_stream_ordering(
room_id=room_id,
stream_ordering=end_token.stream,
)
if last_row:
_, _, event_id = last_row
event = await self.get_event(event_id, get_prev_content=True)
return event

return None

async def get_current_room_stream_token_for_room_id(
self, room_id: Optional[str] = None
) -> RoomStreamToken:
Expand Down

0 comments on commit 9a78d14

Please sign in to comment.