Skip to content

Commit

Permalink
Refactor sync APIs to reuse pagination API
Browse files Browse the repository at this point in the history
The sync API often returns events in a topological rather than stream
ordering, e.g. when the user joined the room or on initial sync. When
this happens we can reuse existing pagination storage functions.
  • Loading branch information
erikjohnston committed May 9, 2018
1 parent e5ab9cd commit e2accd7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 44 deletions.
19 changes: 13 additions & 6 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,19 @@ def _load_filtered_recents(self, room_id, sync_config, now_token,
since_key = since_token.room_key

while limited and len(recents) < timeline_limit and max_repeat:
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
if since_key:
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
end_token=end_key,
)
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
Expand Down
73 changes: 35 additions & 38 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,52 +233,49 @@ def get_rooms_that_changed(self, room_ids, from_key):
@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
# Note: If from_key is None then we return in topological order. This
# is because in that case we're using this as a "get the last few messages
# in a room" function, rather than "get new messages since last sync"
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
from_id = None
to_id = RoomStreamToken.parse_stream_token(to_key).stream

"""Get new room events in stream ordering since `from_key`.
Args:
room_id (str)
from_key (str): Token from which no events are returned before
to_key (str): Token from which no events are returned after. (This
is typically the current stream token)
limit (int): Maximum number of events to return
order (str): Either "DESC" or "ASC". Determines which events are
returned when the result is limited. If "DESC" then the most
recent `limit` events are returned, otherwise returns the
oldest `limit` events.
Returns:
Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
events (in ascending order) and the token from the start of
the chunk of events returned.
"""
if from_key == to_key:
defer.returnValue(([], from_key))

if from_id:
has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id
)
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream

if not has_changed:
defer.returnValue(([], from_key))
has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id
)

def f(txn):
if from_id is not None:
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))

rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
else:
sql = (
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
" ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))
if not has_changed:
defer.returnValue(([], from_key))

rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
def f(txn):
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))

rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
return rows

rows = yield self.runInteraction("get_room_events_stream_for_room", f)
Expand Down

0 comments on commit e2accd7

Please sign in to comment.