From 4d970ebed298faca3618049d9fd7f928b4a04113 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 18 Jan 2023 09:50:19 -0500 Subject: [PATCH 1/4] Abstract code to generate pagination bounds. --- synapse/storage/databases/main/stream.py | 110 +++++++++++++++-------- 1 file changed, 71 insertions(+), 39 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 63d835053019..c12e16209748 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -170,6 +170,74 @@ def generate_pagination_where_clause( return " AND ".join(where_clause) +def generate_pagination_bounds( + direction: str, from_token: RoomStreamToken, to_token: Optional[RoomStreamToken] +) -> Tuple[str, Tuple[Optional[int], int], Optional[Tuple[Optional[int], int]]]: + """ + Generate a start and end point for this page of events. + + Args: + direction: Whether pagination is going forwards or backwards. One of "f" or "b". + from_token: The token to start pagination at. + to_token: The token to end pagination at, or None to not limit the end point. + + Returns: + A three tuple of: + + ASC or DESC for sorting of the query. + + The starting position as a tuple of ints representing + (topological position, stream position). The topological position + may be None for live tokens. + + The end position in the same format as the starting position, or None + if no to_token was provided. + """ + + # Tokens really represent positions between elements, but we use + # the convention of pointing to the event before the gap. Hence + # we have a bit of asymmetry when it comes to equalities. + if direction == "b": + order = "DESC" + else: + order = "ASC" + + # The bounds for the stream tokens are complicated by the fact + # that we need to handle the instance_map part of the tokens. We do this + # by fetching all events between the min stream token and the maximum + # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and + # then filtering the results. + if from_token.topological is not None: + from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple() + elif direction == "b": + from_bound = ( + None, + from_token.get_max_stream_pos(), + ) + else: + from_bound = ( + None, + from_token.stream, + ) + + to_bound: Optional[Tuple[Optional[int], int]] = None + if to_token: + if to_token.topological is not None: + to_bound = to_token.as_historical_tuple() + elif direction == "b": + to_bound = ( + None, + to_token.stream, + ) + else: + to_bound = ( + None, + to_token.get_max_stream_pos(), + ) + + return order, from_bound, to_bound + + def _make_generic_sql_bound( bound: str, column_names: Tuple[str, str], @@ -1272,47 +1340,11 @@ def _paginate_room_events_txn( `to_token`), or `limit` is zero. """ - # Tokens really represent positions between elements, but we use - # the convention of pointing to the event before the gap. Hence - # we have a bit of asymmetry when it comes to equalities. args = [False, room_id] - if direction == "b": - order = "DESC" - else: - order = "ASC" - - # The bounds for the stream tokens are complicated by the fact - # that we need to handle the instance_map part of the tokens. We do this - # by fetching all events between the min stream token and the maximum - # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and - # then filtering the results. - if from_token.topological is not None: - from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple() - elif direction == "b": - from_bound = ( - None, - from_token.get_max_stream_pos(), - ) - else: - from_bound = ( - None, - from_token.stream, - ) - to_bound: Optional[Tuple[Optional[int], int]] = None - if to_token: - if to_token.topological is not None: - to_bound = to_token.as_historical_tuple() - elif direction == "b": - to_bound = ( - None, - to_token.stream, - ) - else: - to_bound = ( - None, - to_token.get_max_stream_pos(), - ) + order, from_bound, to_bound = generate_pagination_bounds( + direction, from_token, to_token + ) bounds = generate_pagination_where_clause( direction=direction, From 73dcc6de1c1f04fb7879c857f2cd97f0f11a3011 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 18 Jan 2023 10:00:21 -0500 Subject: [PATCH 2/4] Handle 'live' pagination tokens on the relations endpoint. --- synapse/storage/databases/main/relations.py | 22 ++++++------ synapse/storage/databases/main/stream.py | 40 ++++++++++++--------- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index aea96e9d2478..f35b5abc341b 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -40,7 +40,10 @@ LoggingTransaction, make_in_list_sql_clause, ) -from synapse.storage.databases.main.stream import generate_pagination_where_clause +from synapse.storage.databases.main.stream import ( + generate_pagination_bounds, + generate_pagination_where_clause, +) from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken from synapse.util.caches.descriptors import cached, cachedList @@ -207,24 +210,23 @@ async def get_relations_for_event( where_clause.append("type = ?") where_args.append(event_type) + order, from_bound, to_bound = generate_pagination_bounds( + direction, + from_token.room_key if from_token else None, + to_token.room_key if to_token else None, + ) + pagination_clause = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), - from_token=from_token.room_key.as_historical_tuple() - if from_token - else None, - to_token=to_token.room_key.as_historical_tuple() if to_token else None, + from_token=from_bound, + to_token=to_bound, engine=self.database_engine, ) if pagination_clause: where_clause.append(pagination_clause) - if direction == "b": - order = "DESC" - else: - order = "ASC" - sql = """ SELECT event_id, relation_type, sender, topological_ordering, stream_ordering FROM event_relations diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index c12e16209748..1f799637103d 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -171,14 +171,18 @@ def generate_pagination_where_clause( def generate_pagination_bounds( - direction: str, from_token: RoomStreamToken, to_token: Optional[RoomStreamToken] -) -> Tuple[str, Tuple[Optional[int], int], Optional[Tuple[Optional[int], int]]]: + direction: str, + from_token: Optional[RoomStreamToken], + to_token: Optional[RoomStreamToken], +) -> Tuple[ + str, Optional[Tuple[Optional[int], int]], Optional[Tuple[Optional[int], int]] +]: """ Generate a start and end point for this page of events. Args: direction: Whether pagination is going forwards or backwards. One of "f" or "b". - from_token: The token to start pagination at. + from_token: The token to start pagination at, or None to start at the first value. to_token: The token to end pagination at, or None to not limit the end point. Returns: @@ -187,8 +191,8 @@ def generate_pagination_bounds( ASC or DESC for sorting of the query. The starting position as a tuple of ints representing - (topological position, stream position). The topological position - may be None for live tokens. + (topological position, stream position) or None if no from_token was + provided. The topological position may be None for live tokens. The end position in the same format as the starting position, or None if no to_token was provided. @@ -207,18 +211,20 @@ def generate_pagination_bounds( # by fetching all events between the min stream token and the maximum # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and # then filtering the results. - if from_token.topological is not None: - from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple() - elif direction == "b": - from_bound = ( - None, - from_token.get_max_stream_pos(), - ) - else: - from_bound = ( - None, - from_token.stream, - ) + from_bound: Optional[Tuple[Optional[int], int]] = None + if from_token: + if from_token.topological is not None: + from_bound = from_token.as_historical_tuple() + elif direction == "b": + from_bound = ( + None, + from_token.get_max_stream_pos(), + ) + else: + from_bound = ( + None, + from_token.stream, + ) to_bound: Optional[Tuple[Optional[int], int]] = None if to_token: From 87aa0fa185f27b907e5f197e034e309bb2074171 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 18 Jan 2023 10:13:19 -0500 Subject: [PATCH 3/4] Abstract tricky off-by-one logic. --- synapse/storage/databases/main/relations.py | 16 +++------ synapse/storage/databases/main/stream.py | 38 +++++++++++++++------ 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index f35b5abc341b..3a22a3146bef 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -41,11 +41,12 @@ make_in_list_sql_clause, ) from synapse.storage.databases.main.stream import ( + generate_next_token, generate_pagination_bounds, generate_pagination_where_clause, ) from synapse.storage.engines import PostgresEngine -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken +from synapse.types import JsonDict, StreamKeyType, StreamToken from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: @@ -268,16 +269,9 @@ def _get_recent_references_for_event_txn( topo_orderings = topo_orderings[:limit] stream_orderings = stream_orderings[:limit] - topo = topo_orderings[-1] - token = stream_orderings[-1] - if direction == "b": - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # when we are going backwards so we subtract one from the - # stream part. - token -= 1 - next_key = RoomStreamToken(topo, token) + next_key = generate_next_token( + direction, topo_orderings[-1], stream_orderings[-1] + ) if from_token: next_token = from_token.copy_and_replace( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 1f799637103d..00d5f7b3db64 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -244,6 +244,30 @@ def generate_pagination_bounds( return order, from_bound, to_bound +def generate_next_token( + direction: str, last_topo_ordering: int, last_stream_ordering: int +) -> RoomStreamToken: + """ + Generate the next room stream token based on the currently returned data. + + Args: + direction: Whether pagination is going forwards or backwards. One of "f" or "b". + last_topo_ordering: The last topological ordering being returned. + last_stream_ordering: The last stream ordering being returned. + + Returns: + A new RoomStreamToken to return to the client. + """ + if direction == "b": + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + last_stream_ordering -= 1 + return RoomStreamToken(last_topo_ordering, last_stream_ordering) + + def _make_generic_sql_bound( bound: str, column_names: Tuple[str, str], @@ -1446,16 +1470,10 @@ def _paginate_room_events_txn( ][:limit] if rows: - topo = rows[-1].topological_ordering - token = rows[-1].stream_ordering - if direction == "b": - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # when we are going backwards so we subtract one from the - # stream part. - token -= 1 - next_token = RoomStreamToken(topo, token) + assert rows[-1].topological_ordering is not None + next_token = generate_next_token( + direction, rows[-1].topological_ordering, rows[-1].stream_ordering + ) else: # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token From 0454ae6db3670bf0511f13c243ce98cf13ad22e3 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 18 Jan 2023 10:33:26 -0500 Subject: [PATCH 4/4] Newsfragment --- changelog.d/14866.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14866.bugfix diff --git a/changelog.d/14866.bugfix b/changelog.d/14866.bugfix new file mode 100644 index 000000000000..540f918cbd10 --- /dev/null +++ b/changelog.d/14866.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.53.0 where `next_batch` tokens from `/sync` could not be used with the `/relations` endpoint.