From 91b1b3606c9fb9eede0a6963bc42dfb70635449f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Feb 2021 02:44:33 -0600 Subject: [PATCH 01/51] Add endpoint to get an event at a given timestamp --- synapse/rest/client/v1/room.py | 25 ++++++++++++ .../storage/databases/main/events_worker.py | 40 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index ebf4e3223089..c156d659117c 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1383,6 +1383,30 @@ def register_txn_path(servlet, regex_string, http_server, with_get=False): servlet.__class__.__name__, ) +class TimestampLookupRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/rooms/(?P[^/]*)/timestamp_to_event$") + + def __init__(self, hs): + super(TimestampLookupRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + yield self.auth.check_joined_room(room_id, requester..to_string()) + + timestamp = parse_integer(request, "ts") + thread_id = parse_integer(request, "thread_id", 0) + + event_id = yield self.store.get_event_for_timestamp( + room_id, thread_id, timestamp, + ) + + .returnValue((200, { + "event_id": event_id, + })) + + class RoomSpaceSummaryRestServlet(RestServlet): PATTERNS = ( @@ -1458,6 +1482,7 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False): JoinedRoomsRestServlet(hs).register(http_server) RoomAliasListServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) + TimestampLookupRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if not is_worker: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 403a5ddaba3e..022af9a8315a 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1511,3 +1511,43 @@ def _cleanup_old_transaction_ids_txn(txn): "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn, ) + + def get_event_for_timestamp(self, room_id, thread_id, timestamp): + sql_template = """ + SELECT event_id, origin_server_ts FROM + WHERE + origin_server_ts %s ? + AND room_id = ? + AND thread_id = ? + ORDER BY origin_server_ts + LIMIT 1; + """ + + def f(txn): + txn.execute(sql_template % ("<=",), (timestamp, room_id, thread_id)) + row = txn.fetchone() + if row: + event_id_before, ts_before = row + else: + event_id_before, ts_before = None, None + + txn.execute(sql_template % (">=",), (timestamp, room_id, thread_id)) + row = txn.fetchone() + if row: + event_id_after, ts_after = row + else: + event_id_after, ts_after = None, None + + if event_id_before and event_id_after: + # Return the closest + if (timestamp - ts_before) < (ts_after - timestamp): + return event_id_before + else: + return event_id_after + + if event_id_before: + return event_id_before + + return event_id_after + + return self.runInteraction("get_event_for_timestamp", f) From 8e5fa114ed1d851df6cfbe3e6e9ea07f2ea7afc2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 15 Jul 2021 11:07:27 -0500 Subject: [PATCH 02/51] Update to latest REST servlet standards --- synapse/rest/client/v1/room.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c156d659117c..5fe48c6a468d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1383,29 +1383,33 @@ def register_txn_path(servlet, regex_string, http_server, with_get=False): servlet.__class__.__name__, ) -class TimestampLookupRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/rooms/(?P[^/]*)/timestamp_to_event$") - def __init__(self, hs): - super(TimestampLookupRestServlet, self).__init__(hs) - self.store = hs.get_datastore() +class TimestampLookupRestServlet(RestServlet): + PATTERNS = client_patterns("/rooms/(?P[^/]*)/timestamp_to_event$") - @defer.inlineCallbacks - def on_GET(self, request, room_id): - requester = yield self.auth.get_user_by_req(request) - yield self.auth.check_joined_room(room_id, requester..to_string()) + def __init__(self, hs: "HomeServer"): + super().__init__() + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await self._auth.check_joined_room(room_id, requester.to_string()) timestamp = parse_integer(request, "ts") thread_id = parse_integer(request, "thread_id", 0) - event_id = yield self.store.get_event_for_timestamp( - room_id, thread_id, timestamp, + event_id = await self._store.get_event_for_timestamp( + room_id, + thread_id, + timestamp, ) - .returnValue((200, { + return 200, { "event_id": event_id, - })) - + } class RoomSpaceSummaryRestServlet(RestServlet): From 668aa4e710e6e6e8a59eeab4cffe8e5d1a526a5d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 26 Jul 2021 19:15:07 -0500 Subject: [PATCH 03/51] Remove thread_id --- synapse/rest/client/v1/room.py | 2 -- synapse/storage/databases/main/events_worker.py | 9 ++++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index ac3f25c5f859..c833793ebe2b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1404,11 +1404,9 @@ async def on_GET( await self._auth.check_joined_room(room_id, requester.to_string()) timestamp = parse_integer(request, "ts") - thread_id = parse_integer(request, "thread_id", 0) event_id = await self._store.get_event_for_timestamp( room_id, - thread_id, timestamp, ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6c8cb33997ea..509b6786a8cb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1512,26 +1512,25 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - def get_event_for_timestamp(self, room_id, thread_id, timestamp): + def get_event_for_timestamp(self, room_id, timestamp): sql_template = """ - SELECT event_id, origin_server_ts FROM + SELECT event_id, origin_server_ts FROM events WHERE origin_server_ts %s ? AND room_id = ? - AND thread_id = ? ORDER BY origin_server_ts LIMIT 1; """ def f(txn): - txn.execute(sql_template % ("<=",), (timestamp, room_id, thread_id)) + txn.execute(sql_template % ("<=",), (timestamp, room_id)) row = txn.fetchone() if row: event_id_before, ts_before = row else: event_id_before, ts_before = None, None - txn.execute(sql_template % (">=",), (timestamp, room_id, thread_id)) + txn.execute(sql_template % (">=",), (timestamp, room_id)) row = txn.fetchone() if row: event_id_after, ts_after = row From 5b07487294c58f3046e7e7047b8d63189a77c072 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 26 Jul 2021 22:31:29 -0500 Subject: [PATCH 04/51] Use updated method name check_joined_room -> check_user_in_room See https://github.com/matrix-org/synapse/commit/b58d17e44f9d9ff7e70578e0f4e328bb9113ec7e --- synapse/rest/client/v1/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c833793ebe2b..c0bdb7d35e2b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1401,7 +1401,7 @@ async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: requester = await self._auth.get_user_by_req(request) - await self._auth.check_joined_room(room_id, requester.to_string()) + await self._auth.check_user_in_room(room_id, requester.to_string()) timestamp = parse_integer(request, "ts") From f721899c69d8c8810444687b89c2302a4b37b3c8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 26 Jul 2021 22:33:31 -0500 Subject: [PATCH 05/51] Use correct requester to sender access method --- synapse/rest/client/v1/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c0bdb7d35e2b..fe04b57fbf41 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1401,7 +1401,7 @@ async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: requester = await self._auth.get_user_by_req(request) - await self._auth.check_user_in_room(room_id, requester.to_string()) + await self._auth.check_user_in_room(room_id, requester.user.to_string()) timestamp = parse_integer(request, "ts") From af085abbbf3ff5fed609a39be3ce7929c3d9284f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 26 Jul 2021 22:50:10 -0500 Subject: [PATCH 06/51] Use up-to-date db interaction method --- synapse/storage/databases/main/events_worker.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 509b6786a8cb..74463c40c015 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1512,7 +1512,7 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - def get_event_for_timestamp(self, room_id, timestamp): + async def get_event_for_timestamp(self, room_id, timestamp): sql_template = """ SELECT event_id, origin_server_ts FROM events WHERE @@ -1522,7 +1522,7 @@ def get_event_for_timestamp(self, room_id, timestamp): LIMIT 1; """ - def f(txn): + def get_event_for_timestamp_txn(txn): txn.execute(sql_template % ("<=",), (timestamp, room_id)) row = txn.fetchone() if row: @@ -1549,4 +1549,7 @@ def f(txn): return event_id_after - return self.runInteraction("get_event_for_timestamp", f) + return await self.db_pool.runInteraction( + "get_event_for_timestamp_txn", + get_event_for_timestamp_txn, + ) From 065273b57ed6e30467b97e218e66a37c77def09d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 28 Jul 2021 11:05:15 -0500 Subject: [PATCH 07/51] Add Complement MSC feature flag --- scripts-dev/complement.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 4df224be6719..3290f0ee78dd 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests +go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests From e321ef7c09ec12c024de5d4d01263d6833be7ec2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Nov 2021 19:57:51 -0600 Subject: [PATCH 08/51] Fix query sort order so it returns the closest event before/after --- .../storage/databases/main/events_worker.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 96275c0170da..248559855f6e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1633,25 +1633,40 @@ async def get_event_for_timestamp(self, room_id, timestamp): WHERE origin_server_ts %s ? AND room_id = ? - ORDER BY origin_server_ts + ORDER BY origin_server_ts %s LIMIT 1; """ def get_event_for_timestamp_txn(txn): - txn.execute(sql_template % ("<=",), (timestamp, room_id)) + # Find closest event before a given timestamp. We use descending + # (which gives values largest to smallest) because we want the + # largest possible timestamp *before* the given timestamp. + txn.execute(sql_template % ("<=", "DESC"), (timestamp, room_id)) row = txn.fetchone() if row: event_id_before, ts_before = row else: event_id_before, ts_before = None, None - txn.execute(sql_template % (">=",), (timestamp, room_id)) + # Find closest event after a given timestamp. We use ascending + # (which gives values smallest to largest) because we want the + # closest possible timestamp *after* the given timestamp. + txn.execute(sql_template % (">=", "ASC"), (timestamp, room_id)) row = txn.fetchone() if row: event_id_after, ts_after = row else: event_id_after, ts_after = None, None + logger.info( + "get_event_for_timestamp: timestamp=%s event_id_before=%s ts_before=%s event_id_after=%s ts_after=%s", + timestamp, + event_id_before, + ts_before, + event_id_after, + ts_after, + ) + if event_id_before and event_id_after: # Return the closest if (timestamp - ts_before) < (ts_after - timestamp): From e21e4b5764f67576b266ca75c4eac55587689a0f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Nov 2021 20:39:38 -0600 Subject: [PATCH 09/51] Add ?dir parameter --- synapse/rest/client/room.py | 4 +- .../storage/databases/main/events_worker.py | 71 +++++++++---------- 2 files changed, 35 insertions(+), 40 deletions(-) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 2ae4379084ff..313b72f26978 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1085,10 +1085,10 @@ async def on_GET( await self._auth.check_user_in_room(room_id, requester.user.to_string()) timestamp = parse_integer(request, "ts") + direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) event_id = await self._store.get_event_for_timestamp( - room_id, - timestamp, + room_id, timestamp, direction ) return 200, { diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 248559855f6e..9689bd392a2c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -34,7 +34,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.api.errors import NotFoundError, SynapseError +from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, @@ -1627,9 +1627,11 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def get_event_for_timestamp(self, room_id, timestamp): + async def get_event_for_timestamp( + self, room_id: str, timestamp: int, direction: str + ) -> str: sql_template = """ - SELECT event_id, origin_server_ts FROM events + SELECT event_id FROM events WHERE origin_server_ts %s ? AND room_id = ? @@ -1637,47 +1639,40 @@ async def get_event_for_timestamp(self, room_id, timestamp): LIMIT 1; """ - def get_event_for_timestamp_txn(txn): - # Find closest event before a given timestamp. We use descending - # (which gives values largest to smallest) because we want the - # largest possible timestamp *before* the given timestamp. - txn.execute(sql_template % ("<=", "DESC"), (timestamp, room_id)) - row = txn.fetchone() - if row: - event_id_before, ts_before = row + def get_event_for_timestamp_txn(txn) -> str: + + if direction == "b": + # Find closest event *before* a given timestamp. We use descending + # (which gives values largest to smallest) because we want the + # largest possible timestamp *before* the given timestamp. + comparison_operator = "<=" + order = "DESC" else: - event_id_before, ts_before = None, None + # Find closest event *after* a given timestamp. We use ascending + # (which gives values smallest to largest) because we want the + # closest possible timestamp *after* the given timestamp. + comparison_operator = ">=" + order = "ASC" - # Find closest event after a given timestamp. We use ascending - # (which gives values smallest to largest) because we want the - # closest possible timestamp *after* the given timestamp. - txn.execute(sql_template % (">=", "ASC"), (timestamp, room_id)) + txn.execute( + sql_template % (comparison_operator, order), (timestamp, room_id) + ) row = txn.fetchone() + logger.info("row %s", row) if row: - event_id_after, ts_after = row - else: - event_id_after, ts_after = None, None - - logger.info( - "get_event_for_timestamp: timestamp=%s event_id_before=%s ts_before=%s event_id_after=%s ts_after=%s", - timestamp, - event_id_before, - ts_before, - event_id_after, - ts_after, - ) - - if event_id_before and event_id_after: - # Return the closest - if (timestamp - ts_before) < (ts_after - timestamp): - return event_id_before - else: - return event_id_after + (event_id,) = row + return event_id - if event_id_before: - return event_id_before + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) - return event_id_after + if direction not in ("f", "b"): + raise SynapseError( + 400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM + ) return await self.db_pool.runInteraction( "get_event_for_timestamp_txn", From fa15989d9e536b95a8ba4fc84747d737d6a4daae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 00:56:39 -0600 Subject: [PATCH 10/51] Add federated /timestamp_to_event endpoint and logic to ask --- synapse/federation/federation_server.py | 16 ++++ synapse/federation/transport/client.py | 15 ++++ .../federation/transport/server/federation.py | 21 ++++++ synapse/handlers/federation.py | 61 +++++++-------- synapse/handlers/room.py | 75 +++++++++++++++++++ synapse/rest/client/room.py | 5 +- synapse/server.py | 5 ++ .../storage/databases/main/events_worker.py | 65 ++++++++++++++-- 8 files changed, 223 insertions(+), 40 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9a8758e9a6d3..47e05f4723b8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -200,6 +200,22 @@ async def on_backfill_request( return 200, res + async def on_timestamp_to_event_request( + self, origin: str, room_id: str, timestamp: int, direction: str + ) -> Tuple[int, Dict[str, Any]]: + with (await self._server_linearizer.queue((origin, room_id))): + origin_host, _ = parse_server_name(origin) + await self.check_server_matches_acl(origin_host, room_id) + + # We only try to fetch data from the local database + event_id = await self.store.get_event_for_timestamp( + room_id, timestamp, direction + ) + + return 200, { + "event_id": event_id, + } + async def on_incoming_transaction( self, origin: str, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 10b5aa5af824..f287eccd4f1f 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -148,6 +148,21 @@ async def backfill( destination, path=path, args=args, try_trailing_slash_on_400=True ) + @log_function + async def timestamp_to_event( + self, destination: str, room_id: str, timestamp: int, direction: str + ) -> Optional[JsonDict]: + """ + TODO + """ + path = _create_v1_path("/timestamp_to_event/%s", room_id) + + args = {"ts": [str(timestamp)], "dir": [direction]} + + return await self.client.get_json( + destination, path=path, args=args, try_trailing_slash_on_400=True + ) + @log_function async def send_transaction( self, diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 2fdf6cc99e49..32a3601c04ca 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -174,6 +174,26 @@ async def on_GET( return await self.handler.on_backfill_request(origin, room_id, versions, limit) +class FederationTimestampLookupServlet(BaseFederationServerServlet): + PATH = "/timestamp_to_event/(?P[^/]*)/?" + + async def on_GET( + self, + origin: str, + content: Literal[None], + query: Dict[bytes, List[bytes]], + room_id: str, + ) -> Tuple[int, JsonDict]: + timestamp = parse_integer_from_args(query, "ts", None) + direction = parse_string_from_args( + query, "dir", default="f", allowed_values=["f", "b"] + ) + + return await self.handler.on_timestamp_to_event_request( + origin, room_id, timestamp, direction + ) + + class FederationQueryServlet(BaseFederationServerServlet): PATH = "/query/(?P[^/]*)" @@ -680,6 +700,7 @@ async def on_GET( FederationStateV1Servlet, FederationStateIdsServlet, FederationBackfillServlet, + FederationTimestampLookupServlet, FederationQueryServlet, FederationMakeJoinServlet, FederationMakeLeaveServlet, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3112cc88b1cc..1ea837d08211 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -68,6 +68,37 @@ logger = logging.getLogger(__name__) +def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: + """Get joined domains from state + + Args: + state: State map from type/state key to event. + + Returns: + Returns a list of servers with the lowest depth of their joins. + Sorted by lowest depth first. + """ + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member and event.membership == Membership.JOIN + ] + + joined_domains: Dict[str, int] = {} + for u, d in joined_users: + try: + dom = get_domain_from_id(u) + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except Exception: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + class FederationHandler: """Handles general incoming federation requests @@ -268,36 +299,6 @@ async def _maybe_backfill_inner( curr_state = await self.state_handler.get_current_state(room_id) - def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: - """Get joined domains from state - - Args: - state: State map from type/state key to event. - - Returns: - Returns a list of servers with the lowest depth of their joins. - Sorted by lowest depth first. - """ - joined_users = [ - (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() - if e_type == EventTypes.Member and event.membership == Membership.JOIN - ] - - joined_domains: Dict[str, int] = {} - for u, d in joined_users: - try: - dom = get_domain_from_id(u) - old_d = joined_domains.get(dom) - if old_d: - joined_domains[dom] = min(d, old_d) - else: - joined_domains[dom] = d - except Exception: - pass - - return sorted(joined_domains.items(), key=lambda d: d[1]) - curr_domains = get_domains_from_state(curr_state) likely_domains = [ diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 11af30eee7ec..ceab663459aa 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -76,6 +76,7 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_and_validate_server_name from synapse.visibility import filter_events_for_client +from synapse.handlers.federation import get_domains_from_state if TYPE_CHECKING: from synapse.server import HomeServer @@ -1217,6 +1218,80 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]: return results +class TimestampLookupHandler: + def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.state_handler = hs.get_state_handler() + self.transport_layer = hs.get_federation_transport_client() + + async def get_event_for_timestamp( + self, + requester: Requester, + room_id: str, + timestamp: int, + direction: str, + ) -> Optional[JsonDict]: + event_id = await self.store.get_event_for_timestamp( + room_id, timestamp, direction + ) + + if not event_id: + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) + + # If we found an extremity, we should probably ask another homeserver + # first about more history in between + is_extremity = await self.store.check_if_event_is_extremity(room_id, event_id) + logger.info( + "get_event_for_timestamp: locally, we found event=%s closest to timestamp=%s and is is_extremity=%s", + event_id, + timestamp, + is_extremity, + ) + if is_extremity: + logger.info( + "get_event_for_timestamp: locally, we found event=%s closest to timestamp=%s is an extremity so we're asking other homeservers first", + event_id, + timestamp, + ) + + curr_state = await self.state_handler.get_current_state(room_id) + curr_domains = get_domains_from_state(curr_state) + likely_domains = [ + domain for domain, depth in curr_domains if domain != self.server_name + ] + + for domain in likely_domains: + try: + remote_response = await self.transport_layer.timestamp_to_event( + domain, room_id, timestamp, direction + ) + logger.info( + "get_event_for_timestamp: response from domain(%s)=%s", + domain, + remote_response, + ) + remote_event_id = remote_response.get("event_id", None) + if remote_event_id: + # TODO: Do we want to persist this as an extremity? + return remote_event_id + + continue + except Exception as e: + logger.exception( + "Failed to fetch /timestamp_to_event from %s because %s", + domain, + e, + ) + continue + + return event_id + + class RoomEventSource(EventSource[RoomStreamToken, EventBase]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 313b72f26978..050978ea8457 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1077,6 +1077,7 @@ def __init__(self, hs: "HomeServer"): super().__init__() self._auth = hs.get_auth() self._store = hs.get_datastore() + self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler() async def on_GET( self, request: SynapseRequest, room_id: str @@ -1087,8 +1088,8 @@ async def on_GET( timestamp = parse_integer(request, "ts") direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) - event_id = await self._store.get_event_for_timestamp( - room_id, timestamp, direction + event_id = await self.timestamp_lookup_handler.get_event_for_timestamp( + requester, room_id, timestamp, direction ) return 200, { diff --git a/synapse/server.py b/synapse/server.py index 013a7bacaa4a..5abbd7044514 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -96,6 +96,7 @@ RoomContextHandler, RoomCreationHandler, RoomShutdownHandler, + TimestampLookupHandler, ) from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler @@ -725,6 +726,10 @@ def get_pagination_handler(self) -> PaginationHandler: def get_room_context_handler(self) -> RoomContextHandler: return RoomContextHandler(self) + @cache_in_self + def get_timestamp_lookup_handler(self) -> TimestampLookupHandler: + return TimestampLookupHandler(self) + @cache_in_self def get_registration_handler(self) -> RegistrationHandler: return RegistrationHandler(self) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9689bd392a2c..06cfabe82827 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,9 +1627,64 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) + async def check_if_event_is_extremity(self, room_id: str, event_id: str): + def check_if_event_is_extremity_txn(txn) -> bool: + backward_extremity_query = """ + SELECT event_id FROM event_backward_extremities + WHERE room_id = ? AND event_id = ? + LIMIT 1 + """ + + forward_extremity_query = """ + SELECT event_id FROM event_forward_extremities + WHERE room_id = ? AND event_id = ? + LIMIT 1 + """ + + all_backward_extremity_query = """ + SELECT event_id FROM event_backward_extremities + WHERE room_id = ? + LIMIT 100 + """ + + all_forward_extremity_query = """ + SELECT event_id FROM event_forward_extremities + WHERE room_id = ? + LIMIT 100 + """ + + txn.execute(all_backward_extremity_query, (room_id,)) + all_backward_extremties = txn.fetchall() + txn.execute(all_forward_extremity_query, (room_id,)) + all_forward_extremties = txn.fetchall() + logger.info( + "all_forward_extremties=%s all_backward_extremties=%s", + all_forward_extremties, + all_backward_extremties, + ) + + txn.execute(backward_extremity_query, (room_id, event_id)) + backward_extremties = txn.fetchall() + + if len(backward_extremties): + return True + + txn.execute(forward_extremity_query, (room_id, event_id)) + forward_extremties = txn.fetchall() + + if len(forward_extremties): + return True + + return False + + return await self.db_pool.runInteraction( + "check_if_event_is_extremity_txn", + check_if_event_is_extremity_txn, + ) + async def get_event_for_timestamp( self, room_id: str, timestamp: int, direction: str - ) -> str: + ) -> Optional[str]: sql_template = """ SELECT event_id FROM events WHERE @@ -1640,7 +1695,6 @@ async def get_event_for_timestamp( """ def get_event_for_timestamp_txn(txn) -> str: - if direction == "b": # Find closest event *before* a given timestamp. We use descending # (which gives values largest to smallest) because we want the @@ -1658,16 +1712,11 @@ def get_event_for_timestamp_txn(txn) -> str: sql_template % (comparison_operator, order), (timestamp, room_id) ) row = txn.fetchone() - logger.info("row %s", row) if row: (event_id,) = row return event_id - raise SynapseError( - 404, - "Unable to find event from %s in direction %s" % (timestamp, direction), - errcode=Codes.NOT_FOUND, - ) + return None if direction not in ("f", "b"): raise SynapseError( From ec2695d25e19aad12d429821b585603fb0085253 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 01:21:12 -0600 Subject: [PATCH 11/51] Determine forward extremity by edges --- .../storage/databases/main/events_worker.py | 42 +++++++------------ 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 06cfabe82827..d31303363a28 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1629,50 +1629,36 @@ def _cleanup_old_transaction_ids_txn(txn): async def check_if_event_is_extremity(self, room_id: str, event_id: str): def check_if_event_is_extremity_txn(txn) -> bool: + # If the event in question, is listed as a backward extremity, it's + # an extremity. backward_extremity_query = """ SELECT event_id FROM event_backward_extremities WHERE room_id = ? AND event_id = ? LIMIT 1 """ - forward_extremity_query = """ - SELECT event_id FROM event_forward_extremities - WHERE room_id = ? AND event_id = ? + # Check to see whether the event in question is already referenced + # by another event. If we don't see any edges, we're next to a + # forward gap. + # + # We can't just check `event_forward_extremities` here because those + # can only be .... (pending discussion on https://github.com/matrix-org/synapse/pull/9445#discussion_r750946180) + forward_edge_query = """ + SELECT event_id FROM event_edges + WHERE room_id = ? AND prev_event_id = ? LIMIT 1 """ - all_backward_extremity_query = """ - SELECT event_id FROM event_backward_extremities - WHERE room_id = ? - LIMIT 100 - """ - - all_forward_extremity_query = """ - SELECT event_id FROM event_forward_extremities - WHERE room_id = ? - LIMIT 100 - """ - - txn.execute(all_backward_extremity_query, (room_id,)) - all_backward_extremties = txn.fetchall() - txn.execute(all_forward_extremity_query, (room_id,)) - all_forward_extremties = txn.fetchall() - logger.info( - "all_forward_extremties=%s all_backward_extremties=%s", - all_forward_extremties, - all_backward_extremties, - ) - txn.execute(backward_extremity_query, (room_id, event_id)) backward_extremties = txn.fetchall() if len(backward_extremties): return True - txn.execute(forward_extremity_query, (room_id, event_id)) - forward_extremties = txn.fetchall() + txn.execute(forward_edge_query, (room_id, event_id)) + forward_edges = txn.fetchall() - if len(forward_extremties): + if not len(forward_edges): return True return False From 22a93c34b684583a8eff52981c27a98282004222 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 01:39:09 -0600 Subject: [PATCH 12/51] Add experimental feature flag for MSC3030 --- synapse/config/experimental.py | 3 +++ synapse/federation/transport/server/__init__.py | 12 +++++++++++- synapse/rest/client/room.py | 3 ++- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 8b098ad48d56..d78a15097c87 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -46,3 +46,6 @@ def read_config(self, config: JsonDict, **kwargs): # MSC3266 (room summary api) self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False) + + # MSC3030 (Jump to date API endpoint) + self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False) diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index c32539bf5a52..abcb8728f579 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -22,7 +22,10 @@ Authenticator, BaseFederationServlet, ) -from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES +from synapse.federation.transport.server.federation import ( + FEDERATION_SERVLET_CLASSES, + FederationTimestampLookupServlet, +) from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES from synapse.federation.transport.server.groups_server import ( GROUP_SERVER_SERVLET_CLASSES, @@ -324,6 +327,13 @@ def register_servlets( ) for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]: + # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled + if ( + servletclass == FederationTimestampLookupServlet + and not hs.config.experimental.msc3030_enabled + ): + continue + servletclass( hs=hs, authenticator=authenticator, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 050978ea8457..e6838ce9fee0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1266,7 +1266,8 @@ def register_servlets( RoomAliasListServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) RoomCreateRestServlet(hs).register(http_server) - TimestampLookupRestServlet(hs).register(http_server) + if hs.config.experimental.msc3030_enabled: + TimestampLookupRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if not is_worker: From b31185342192aa8a75fb3767ed9d8d65abb7c818 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 03:05:16 -0600 Subject: [PATCH 13/51] Document and move to unstable prefixes --- synapse/federation/federation_server.py | 15 ++++ synapse/federation/transport/client.py | 21 +++++- .../federation/transport/server/federation.py | 20 +++++ synapse/handlers/room.py | 75 +++++++++++++------ synapse/rest/client/room.py | 27 ++++++- .../storage/databases/main/events_worker.py | 14 +++- 6 files changed, 145 insertions(+), 27 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 47e05f4723b8..49f211d4fac0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -203,6 +203,21 @@ async def on_backfill_request( async def on_timestamp_to_event_request( self, origin: str, room_id: str, timestamp: int, direction: str ) -> Tuple[int, Dict[str, Any]]: + """When we receive a federated `/timestamp_to_event` request, + handle all of the logic for validating and fetching the event. + + Args: + origin: The server we received the event from + room_id: Room to fetch the event from + timestamp: The point in time we should navigate from in the given + direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + Tuple indicating the response status code and dictionary response + body including `event_id`. + """ with (await self._server_linearizer.queue((origin, room_id))): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f287eccd4f1f..d2dd26b75903 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -153,9 +153,26 @@ async def timestamp_to_event( self, destination: str, room_id: str, timestamp: int, direction: str ) -> Optional[JsonDict]: """ - TODO + Calls a remote federating server at `destination` asking for their + closest event to the given timestamp in the given direction. + + Args: + destination: Domain name of the remote homeserver + room_id: Room to fetch the event from + timestamp: The point in time we should navigate from in the given + direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + Results in a dict received from the remote homeserver. + Expected response will include the `event_id` key for the closest event. """ - path = _create_v1_path("/timestamp_to_event/%s", room_id) + path = _create_path( + FEDERATION_UNSTABLE_PREFIX, + "/org.matrix.msc3030/timestamp_to_event/%s", + room_id, + ) args = {"ts": [str(timestamp)], "dir": [direction]} diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 32a3601c04ca..6ce44fea76ff 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -175,7 +175,27 @@ async def on_GET( class FederationTimestampLookupServlet(BaseFederationServerServlet): + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for other homeservers when they're unable to find an event locally. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/?ts=&dir= + { + "event_id": ... + } + """ + PATH = "/timestamp_to_event/(?P[^/]*)/?" + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030" async def on_GET( self, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ceab663459aa..c827e1033896 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1231,46 +1231,62 @@ async def get_event_for_timestamp( room_id: str, timestamp: int, direction: str, - ) -> Optional[JsonDict]: - event_id = await self.store.get_event_for_timestamp( + ) -> Optional[str]: + """Find the closest event to the given timestamp in the given direction. + If we can't find an event locally or the event we have locally is next to a gap, + it will ask other federated homeservers for an event. + + Args: + requester: The user making the request according to the access token + room_id: Room to fetch the event from + timestamp: The point in time we should navigate from in the given + direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + `event_id` closest to the given timestamp in the given direction + + Raises: + SynapseError if unable to find any event locally in the given direction + """ + + local_event_id = await self.store.get_event_for_timestamp( room_id, timestamp, direction ) - if not event_id: - raise SynapseError( - 404, - "Unable to find event from %s in direction %s" % (timestamp, direction), - errcode=Codes.NOT_FOUND, - ) + logger.debug( + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s", + local_event_id, + timestamp, + ) # If we found an extremity, we should probably ask another homeserver # first about more history in between - is_extremity = await self.store.check_if_event_is_extremity(room_id, event_id) - logger.info( - "get_event_for_timestamp: locally, we found event=%s closest to timestamp=%s and is is_extremity=%s", - event_id, - timestamp, - is_extremity, + is_extremity = await self.store.check_if_event_is_extremity( + room_id, local_event_id ) - if is_extremity: - logger.info( - "get_event_for_timestamp: locally, we found event=%s closest to timestamp=%s is an extremity so we're asking other homeservers first", - event_id, + if not local_event_id or is_extremity: + logger.debug( + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is an extremity so we're asking other homeservers first", + local_event_id, timestamp, ) + # Find other homeservers from the given state in the room curr_state = await self.state_handler.get_current_state(room_id) curr_domains = get_domains_from_state(curr_state) likely_domains = [ domain for domain, depth in curr_domains if domain != self.server_name ] + # Loop through each homeserver candidate until we get a succesful response for domain in likely_domains: try: remote_response = await self.transport_layer.timestamp_to_event( domain, room_id, timestamp, direction ) - logger.info( + logger.debug( "get_event_for_timestamp: response from domain(%s)=%s", domain, remote_response, @@ -1278,18 +1294,31 @@ async def get_event_for_timestamp( remote_event_id = remote_response.get("event_id", None) if remote_event_id: # TODO: Do we want to persist this as an extremity? + # TODO: I think ideally, we would try to backfill from + # this event and run this whole + # `get_event_for_timestamp` function again to make sure + # they didn't give us an event from their gappy history. return remote_event_id continue - except Exception as e: + except Exception as ex: logger.exception( - "Failed to fetch /timestamp_to_event from %s because %s", + "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", domain, - e, + type(ex).__name__, + ex, + ex.args, ) continue - return event_id + if not local_event_id: + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) + + return local_event_id class RoomEventSource(EventSource[RoomStreamToken, EventBase]): diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index e6838ce9fee0..409389ca46cc 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1071,7 +1071,32 @@ def register_txn_path( class TimestampLookupRestServlet(RestServlet): - PATTERNS = client_patterns("/rooms/(?P[^/]*)/timestamp_to_event$") + """ + API endpoint to fetch the `event_id` of the closest event to the given + timestamp (`ts` query parameter) in the given direction (`dir` query + parameter). + + Useful for cases like jump to date so you can start paginating messages from + a given date in the archive. + + `ts` is a timestamp in milliseconds where we will find the closest event in + the given direction. + + `dir` can be `f` or `b` to indicate forwards and backwards in time from the + given timestamp. + + GET /_matrix/client/unstable/org.matrix.msc3030/rooms//timestamp_to_event?ts=&dir= + { + "event_id": ... + } + """ + + PATTERNS = ( + re.compile( + "^/_matrix/client/unstable/org.matrix.msc3030" + "/rooms/(?P[^/]*)/timestamp_to_event$" + ), + ) def __init__(self, hs: "HomeServer"): super().__init__() diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d31303363a28..f5bc66ddd349 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,7 +1627,19 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def check_if_event_is_extremity(self, room_id: str, event_id: str): + async def check_if_event_is_extremity(self, room_id: str, event_id: str) -> bool: + """Check if the given `event_id` is a forward or backward extremity. + More specifically, we're looking whether it's next to a gap of missing + events. + + Args: + room_id: room where the event lives + event_id: event to check + + Returns: + Boolean indicating whether it's an extremity + """ + def check_if_event_is_extremity_txn(txn) -> bool: # If the event in question, is listed as a backward extremity, it's # an extremity. From 5638123746ff1d2793183aced9235ca8a14441c1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 03:08:54 -0600 Subject: [PATCH 14/51] Add changelog --- changelog.d/9445.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9445.feature diff --git a/changelog.d/9445.feature b/changelog.d/9445.feature new file mode 100644 index 000000000000..6d12eea71f6f --- /dev/null +++ b/changelog.d/9445.feature @@ -0,0 +1 @@ +Add [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) experimental client and federation API endpoints to get the closest event to a given timestamp. From 612b51f76e575d3580ec43b0b631af0d1189baf7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 03:41:22 -0600 Subject: [PATCH 15/51] Fix lints --- .../federation/transport/server/federation.py | 4 ++-- synapse/handlers/room.py | 13 +++++++++---- synapse/http/servlet.py | 19 +++++++++++++++++++ synapse/rest/client/room.py | 2 +- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 6ce44fea76ff..b13ffaae3da6 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -204,9 +204,9 @@ async def on_GET( query: Dict[bytes, List[bytes]], room_id: str, ) -> Tuple[int, JsonDict]: - timestamp = parse_integer_from_args(query, "ts", None) + timestamp = parse_integer_from_args(query, "ts", required=True) direction = parse_string_from_args( - query, "dir", default="f", allowed_values=["f", "b"] + query, "dir", default="f", allowed_values=["f", "b"], required=True ) return await self.handler.on_timestamp_to_event_request( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c827e1033896..3e568d5809d3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -56,6 +56,7 @@ from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase from synapse.events.utils import copy_power_levels_contents +from synapse.handlers.federation import get_domains_from_state from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter from synapse.streams import EventSource @@ -76,7 +77,6 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_and_validate_server_name from synapse.visibility import filter_events_for_client -from synapse.handlers.federation import get_domains_from_state if TYPE_CHECKING: from synapse.server import HomeServer @@ -1263,9 +1263,11 @@ async def get_event_for_timestamp( # If we found an extremity, we should probably ask another homeserver # first about more history in between - is_extremity = await self.store.check_if_event_is_extremity( - room_id, local_event_id - ) + is_extremity = False + if local_event_id: + is_extremity = await self.store.check_if_event_is_extremity( + room_id, local_event_id + ) if not local_event_id or is_extremity: logger.debug( "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is an extremity so we're asking other homeservers first", @@ -1291,6 +1293,9 @@ async def get_event_for_timestamp( domain, remote_response, ) + if not remote_response: + continue + remote_event_id = remote_response.get("event_id", None) if remote_event_id: # TODO: Do we want to persist this as an extremity? diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 91ba93372c2c..5335f928ebff 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -79,6 +79,25 @@ def parse_integer( return parse_integer_from_args(args, name, default, required) +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[int] = None, +) -> Optional[int]: + ... + + +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + *, + required: Literal[True], +) -> int: + ... + + def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], name: str, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 409389ca46cc..4b5aae80e2e2 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1110,7 +1110,7 @@ async def on_GET( requester = await self._auth.get_user_by_req(request) await self._auth.check_user_in_room(room_id, requester.user.to_string()) - timestamp = parse_integer(request, "ts") + timestamp = parse_integer(request, "ts", required=True) direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) event_id = await self.timestamp_lookup_handler.get_event_for_timestamp( From 654d7aeaee7d366d5ea6b4c9297e54f3d477a6df Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 03:49:33 -0600 Subject: [PATCH 16/51] Fix lint --- synapse/http/servlet.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 5335f928ebff..d924aec27815 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -79,6 +79,16 @@ def parse_integer( return parse_integer_from_args(args, name, default, required) +@overload +def parse_integer_from_args( + args: Mapping[bytes, Sequence[bytes]], + name: str, + default: Optional[int] = None, + required: bool = False, +) -> Optional[int]: + ... + + @overload def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], From 6280d3632223938e3df4767a2c3752a5a6e34a32 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Nov 2021 04:02:08 -0600 Subject: [PATCH 17/51] Fix lint --- synapse/http/servlet.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index d924aec27815..6dd9b9ad0358 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -84,7 +84,6 @@ def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], name: str, default: Optional[int] = None, - required: bool = False, ) -> Optional[int]: ... @@ -93,8 +92,9 @@ def parse_integer_from_args( def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], name: str, - default: Optional[int] = None, -) -> Optional[int]: + *, + required: Literal[True], +) -> int: ... @@ -102,9 +102,9 @@ def parse_integer_from_args( def parse_integer_from_args( args: Mapping[bytes, Sequence[bytes]], name: str, - *, - required: Literal[True], -) -> int: + default: Optional[int] = None, + required: bool = False, +) -> Optional[int]: ... From 8766b0a233b69e6595e895f781223d0668917467 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 00:39:36 -0600 Subject: [PATCH 18/51] Filter events before handing them over federation --- synapse/federation/federation_server.py | 39 +++++++++++++++++++++++-- synapse/visibility.py | 21 +++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 49f211d4fac0..d000687f0e2d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -70,6 +70,7 @@ from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_server_name +from synapse.visibility import filter_events_for_server if TYPE_CHECKING: from synapse.server import HomeServer @@ -110,6 +111,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) self.handler = hs.get_federation_handler() + self.storage = hs.get_storage() self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() @@ -226,10 +228,41 @@ async def on_timestamp_to_event_request( event_id = await self.store.get_event_for_timestamp( room_id, timestamp, direction ) + event = await self.store.get_event( + event_id, allow_none=True, allow_rejected=True + ) + filtered_events = await filter_events_for_server( + self.storage, + origin, + [event], + # TODO: Is returning an event_id considered leaking? It looks + # like we already return redacted copies in other parts of the + # code. + redact=False, + ) + logger.debug( + "FederationServer.on_timestamp_to_event_request: filtered_events origin=%s room_id=%s before=%s after=%s", + origin, + room_id, + event_id, + filtered_events, + ) - return 200, { - "event_id": event_id, - } + # TODO: There is an edge case here where if the closest event isn't + # visible to the origin homeserver, we will return a 404 when there + # could potentially be some visible event in the given direction. + # The query for selecting the events would need to be smarter if we + # wanted to handle this. + if len(filtered_events): + return 200, { + "event_id": filtered_events[0].event_id, + } + + raise SynapseError( + 404, + "Unable to find event from %s in direction %s" % (timestamp, direction), + errcode=Codes.NOT_FOUND, + ) async def on_incoming_transaction( self, diff --git a/synapse/visibility.py b/synapse/visibility.py index 17532059e9f8..6705baf2ac46 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -296,6 +296,13 @@ def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool visibility = history.content.get( "history_visibility", HistoryVisibility.SHARED ) + logger.debug( + "filter_events_for_server: check_event_is_visible(event=%s, state=%s) for server_name=%s visibility=%s", + event.event_id, + state, + server_name, + visibility, + ) if visibility in [HistoryVisibility.INVITED, HistoryVisibility.JOINED]: # We now loop through all state events looking for # membership states for the requesting server to determine @@ -313,6 +320,15 @@ def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool continue memtype = ev.membership + logger.debug( + "filter_events_for_server: check_event_is_visible(event=%s) for server_name=%s checking membership: visibility=%s domain=%s memtype=%s state_event_id=%s", + event.event_id, + server_name, + visibility, + domain, + memtype, + ev.event_id, + ) if memtype == Membership.JOIN: return True elif memtype == Membership.INVITE: @@ -429,6 +445,11 @@ def include(typ, state_key): for e in events: erased = is_sender_erased(e, erased_senders) visible = check_event_is_visible(e, event_to_state[e.event_id]) + logger.debug( + "filter_events_for_server: check_event_is_visible(event=%s) -> visible=%s", + e.event_id, + visible, + ) if visible and not erased: to_return.append(e) elif redact: From 86a2642a879ab2f2c46917d98276ece4f41758c9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 00:40:44 -0600 Subject: [PATCH 19/51] Don't spam logs for 404's See https://github.com/matrix-org/synapse/pull/9445#discussion_r756055778 --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3e568d5809d3..1e52294c8cb2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1307,7 +1307,7 @@ async def get_event_for_timestamp( continue except Exception as ex: - logger.exception( + logger.debug( "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", domain, type(ex).__name__, From 5a2c99786ca68019e6818024bf3d638299e6b11c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 00:49:41 -0600 Subject: [PATCH 20/51] Allow returning event_ids for hidden events according to history visiblity --- synapse/federation/federation_server.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d000687f0e2d..08f493df7292 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -235,10 +235,11 @@ async def on_timestamp_to_event_request( self.storage, origin, [event], - # TODO: Is returning an event_id considered leaking? It looks - # like we already return redacted copies in other parts of the - # code. - redact=False, + # TODO: Is returning an event_id for an event that's hidden by + # history_visibility considered leaking? It looks like we + # already return redacted copies of hidden events in the + # `/backfill` endpoint. + redact=True, ) logger.debug( "FederationServer.on_timestamp_to_event_request: filtered_events origin=%s room_id=%s before=%s after=%s", From bc3ba38d3543e729173766da536c6312246b9ec6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 01:24:49 -0600 Subject: [PATCH 21/51] Ignore rejected events and look for gaps instead --- synapse/handlers/room.py | 12 ++--- .../storage/databases/main/events_worker.py | 54 +++++++++++++------ 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 1e52294c8cb2..b19a11a7cba5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1261,16 +1261,16 @@ async def get_event_for_timestamp( timestamp, ) - # If we found an extremity, we should probably ask another homeserver - # first about more history in between - is_extremity = False + # If we found a gap, we should probably ask another homeserver first + # about more history in between if local_event_id: - is_extremity = await self.store.check_if_event_is_extremity( + event_next_to_gap = await self.store.event_next_to_gap( room_id, local_event_id ) - if not local_event_id or is_extremity: + + if not local_event_id or event_next_to_gap: logger.debug( - "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is an extremity so we're asking other homeservers first", + "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", local_event_id, timestamp, ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f5bc66ddd349..3a714a581c48 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,10 +1627,8 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def check_if_event_is_extremity(self, room_id: str, event_id: str) -> bool: - """Check if the given `event_id` is a forward or backward extremity. - More specifically, we're looking whether it's next to a gap of missing - events. + async def event_next_to_gap(self, room_id: str, event_id: str) -> bool: + """Check if the given `event_id` is next to a gap of missing events. Args: room_id: room where the event lives @@ -1640,12 +1638,17 @@ async def check_if_event_is_extremity(self, room_id: str, event_id: str) -> bool Boolean indicating whether it's an extremity """ - def check_if_event_is_extremity_txn(txn) -> bool: - # If the event in question, is listed as a backward extremity, it's - # an extremity. + def event_next_to_gap_txn(txn) -> bool: + # If the event in question is listed as a backward extremity, it's + # next to a gap. backward_extremity_query = """ - SELECT event_id FROM event_backward_extremities - WHERE room_id = ? AND event_id = ? + SELECT 1 FROM event_backward_extremities + LEFT JOIN rejections USING (event_id) + WHERE + room_id = ? + AND event_id = ? + /* Make sure event is not rejected */ + AND rejections.event_id IS NULL LIMIT 1 """ @@ -1653,18 +1656,32 @@ def check_if_event_is_extremity_txn(txn) -> bool: # by another event. If we don't see any edges, we're next to a # forward gap. # - # We can't just check `event_forward_extremities` here because those - # can only be .... (pending discussion on https://github.com/matrix-org/synapse/pull/9445#discussion_r750946180) + # We can't just check `event_forward_extremities` directly because that + # doesn't include potential backfilled and outlier events. + # + # We consider any event that is an `event_forward_extremities` as + # the latest in the room and not next to a gap. forward_edge_query = """ - SELECT event_id FROM event_edges - WHERE room_id = ? AND prev_event_id = ? + SELECT 1 FROM event_edges + LEFT JOIN rejections ON event_edges.prev_event_id == rejections.event_id + LEFT JOIN event_forward_extremities ON event_edges.prev_event_id == event_forward_extremities.event_id + WHERE + event_edges.room_id = ? + AND event_edges.prev_event_id = ? + /* If the event in question is a forward extremity, we will + * just consider the forward gap as not a gap since it's one + * of the latest events in the room. + */ + AND event_forward_extremities.event_id IS NOT NULL + /* Make sure event is not rejected */ + AND rejections.event_id IS NULL LIMIT 1 """ txn.execute(backward_extremity_query, (room_id, event_id)) - backward_extremties = txn.fetchall() + backward_extremities = txn.fetchall() - if len(backward_extremties): + if len(backward_extremities): return True txn.execute(forward_edge_query, (room_id, event_id)) @@ -1676,8 +1693,8 @@ def check_if_event_is_extremity_txn(txn) -> bool: return False return await self.db_pool.runInteraction( - "check_if_event_is_extremity_txn", - check_if_event_is_extremity_txn, + "event_next_to_gap_txn", + event_next_to_gap_txn, ) async def get_event_for_timestamp( @@ -1685,9 +1702,12 @@ async def get_event_for_timestamp( ) -> Optional[str]: sql_template = """ SELECT event_id FROM events + LEFT JOIN rejections USING (event_id) WHERE origin_server_ts %s ? AND room_id = ? + /* Make sure event is not rejected */ + AND rejections.event_id IS NULL ORDER BY origin_server_ts %s LIMIT 1; """ From edac9535ebbde372279d32807e6b5b40313c1222 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 01:28:50 -0600 Subject: [PATCH 22/51] We only return event or raise an exception See https://github.com/matrix-org/synapse/pull/9445#discussion_r756051540 --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b19a11a7cba5..875c89dd5870 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1231,7 +1231,7 @@ async def get_event_for_timestamp( room_id: str, timestamp: int, direction: str, - ) -> Optional[str]: + ) -> str: """Find the closest event to the given timestamp in the given direction. If we can't find an event locally or the event we have locally is next to a gap, it will ask other federated homeservers for an event. From ab800e36dbcac65836a31d13dba8fe0bb9bf9aed Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 01:31:47 -0600 Subject: [PATCH 23/51] Only return ValueError from storage layer See https://github.com/matrix-org/synapse/pull/9445#discussion_r756033337 --- synapse/storage/databases/main/events_worker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 3a714a581c48..77030e708531 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1737,9 +1737,7 @@ def get_event_for_timestamp_txn(txn) -> str: return None if direction not in ("f", "b"): - raise SynapseError( - 400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM - ) + raise ValueError("Unknown direction: %s" % (direction,)) return await self.db_pool.runInteraction( "get_event_for_timestamp_txn", From 9800a4bcae49968d6d4be4817ea22f331a449fb5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 01:35:34 -0600 Subject: [PATCH 24/51] Add docstring See https://github.com/matrix-org/synapse/pull/9445#discussion_r756037489 --- synapse/storage/databases/main/events_worker.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 77030e708531..43b3542b429c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1700,6 +1700,20 @@ def event_next_to_gap_txn(txn) -> bool: async def get_event_for_timestamp( self, room_id: str, timestamp: int, direction: str ) -> Optional[str]: + """Find the closest event to the given timestamp in the given direction. + + Args: + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + The closest event_id otherwise None if we can't find any event in + the given direction. + """ + sql_template = """ SELECT event_id FROM events LEFT JOIN rejections USING (event_id) From 8523bf3005e0b8aa3cfc46e4838d2156a8e98547 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 01:41:24 -0600 Subject: [PATCH 25/51] Better description for event_next_to_gap --- synapse/storage/databases/main/events_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 43b3542b429c..1cb33ddc1e12 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1629,6 +1629,8 @@ def _cleanup_old_transaction_ids_txn(txn): async def event_next_to_gap(self, room_id: str, event_id: str) -> bool: """Check if the given `event_id` is next to a gap of missing events. + Looks at gaps going forwards and backwards. The gap in front of the + latest events is not considered a gap. Args: room_id: room where the event lives From f05c292486969cf66992f4d3ed196d2c727957a4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Nov 2021 01:57:22 -0600 Subject: [PATCH 26/51] Fix lints --- synapse/federation/federation_server.py | 59 ++++++++++--------- .../storage/databases/main/events_worker.py | 2 +- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 08f493df7292..c6dd9f5bbad9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -228,36 +228,37 @@ async def on_timestamp_to_event_request( event_id = await self.store.get_event_for_timestamp( room_id, timestamp, direction ) - event = await self.store.get_event( - event_id, allow_none=True, allow_rejected=True - ) - filtered_events = await filter_events_for_server( - self.storage, - origin, - [event], - # TODO: Is returning an event_id for an event that's hidden by - # history_visibility considered leaking? It looks like we - # already return redacted copies of hidden events in the - # `/backfill` endpoint. - redact=True, - ) - logger.debug( - "FederationServer.on_timestamp_to_event_request: filtered_events origin=%s room_id=%s before=%s after=%s", - origin, - room_id, - event_id, - filtered_events, - ) + if event_id: + event = await self.store.get_event( + event_id, allow_none=False, allow_rejected=False + ) + filtered_events = await filter_events_for_server( + self.storage, + origin, + [event], + # TODO: Is returning an event_id for an event that's hidden by + # history_visibility considered leaking? It looks like we + # already return redacted copies of hidden events in the + # `/backfill` endpoint. + redact=True, + ) + logger.debug( + "FederationServer.on_timestamp_to_event_request: filtered_events origin=%s room_id=%s before=%s after=%s", + origin, + room_id, + event_id, + filtered_events, + ) - # TODO: There is an edge case here where if the closest event isn't - # visible to the origin homeserver, we will return a 404 when there - # could potentially be some visible event in the given direction. - # The query for selecting the events would need to be smarter if we - # wanted to handle this. - if len(filtered_events): - return 200, { - "event_id": filtered_events[0].event_id, - } + # TODO: There is an edge case here where if the closest event isn't + # visible to the origin homeserver, we will return a 404 when there + # could potentially be some visible event in the given direction. + # The query for selecting the events would need to be smarter if we + # wanted to handle this. + if len(filtered_events): + return 200, { + "event_id": filtered_events[0].event_id, + } raise SynapseError( 404, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 1cb33ddc1e12..bc94ade7f3a8 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -34,7 +34,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.api.errors import Codes, NotFoundError, SynapseError +from synapse.api.errors import NotFoundError, SynapseError from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, From dae7e0a0e0c6dc53121510357627785f9d66185f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 16:29:49 -0600 Subject: [PATCH 27/51] Remove filter_events_for_server because always ok to see event_id See https://github.com/matrix-org/synapse/pull/9445#discussion_r756013164 --- synapse/federation/federation_server.py | 30 +++---------------------- synapse/visibility.py | 21 ----------------- 2 files changed, 3 insertions(+), 48 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c6dd9f5bbad9..edb574ced8b5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -70,7 +70,6 @@ from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_server_name -from synapse.visibility import filter_events_for_server if TYPE_CHECKING: from synapse.server import HomeServer @@ -232,33 +231,10 @@ async def on_timestamp_to_event_request( event = await self.store.get_event( event_id, allow_none=False, allow_rejected=False ) - filtered_events = await filter_events_for_server( - self.storage, - origin, - [event], - # TODO: Is returning an event_id for an event that's hidden by - # history_visibility considered leaking? It looks like we - # already return redacted copies of hidden events in the - # `/backfill` endpoint. - redact=True, - ) - logger.debug( - "FederationServer.on_timestamp_to_event_request: filtered_events origin=%s room_id=%s before=%s after=%s", - origin, - room_id, - event_id, - filtered_events, - ) - # TODO: There is an edge case here where if the closest event isn't - # visible to the origin homeserver, we will return a 404 when there - # could potentially be some visible event in the given direction. - # The query for selecting the events would need to be smarter if we - # wanted to handle this. - if len(filtered_events): - return 200, { - "event_id": filtered_events[0].event_id, - } + return 200, { + "event_id": event.event_id, + } raise SynapseError( 404, diff --git a/synapse/visibility.py b/synapse/visibility.py index 6705baf2ac46..17532059e9f8 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -296,13 +296,6 @@ def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool visibility = history.content.get( "history_visibility", HistoryVisibility.SHARED ) - logger.debug( - "filter_events_for_server: check_event_is_visible(event=%s, state=%s) for server_name=%s visibility=%s", - event.event_id, - state, - server_name, - visibility, - ) if visibility in [HistoryVisibility.INVITED, HistoryVisibility.JOINED]: # We now loop through all state events looking for # membership states for the requesting server to determine @@ -320,15 +313,6 @@ def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool continue memtype = ev.membership - logger.debug( - "filter_events_for_server: check_event_is_visible(event=%s) for server_name=%s checking membership: visibility=%s domain=%s memtype=%s state_event_id=%s", - event.event_id, - server_name, - visibility, - domain, - memtype, - ev.event_id, - ) if memtype == Membership.JOIN: return True elif memtype == Membership.INVITE: @@ -445,11 +429,6 @@ def include(typ, state_key): for e in events: erased = is_sender_erased(e, erased_senders) visible = check_event_is_visible(e, event_to_state[e.event_id]) - logger.debug( - "filter_events_for_server: check_event_is_visible(event=%s) -> visible=%s", - e.event_id, - visible, - ) if visible and not erased: to_return.append(e) elif redact: From 87ac1ed3ed42cd266d220bdc3c8a5a3cef0e03ce Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 16:44:12 -0600 Subject: [PATCH 28/51] Rename to is_event_next_to_gap See https://github.com/matrix-org/synapse/pull/9445#discussion_r757086866 --- synapse/handlers/room.py | 4 ++-- synapse/storage/databases/main/events_worker.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f85fd885e1aa..7034945c8db9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1267,11 +1267,11 @@ async def get_event_for_timestamp( # If we found a gap, we should probably ask another homeserver first # about more history in between if local_event_id: - event_next_to_gap = await self.store.event_next_to_gap( + is_event_next_to_gap = await self.store.is_event_next_to_gap( room_id, local_event_id ) - if not local_event_id or event_next_to_gap: + if not local_event_id or is_event_next_to_gap: logger.debug( "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", local_event_id, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index bc94ade7f3a8..64dd1a6e5fae 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,7 +1627,7 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def event_next_to_gap(self, room_id: str, event_id: str) -> bool: + async def is_event_next_to_gap(self, room_id: str, event_id: str) -> bool: """Check if the given `event_id` is next to a gap of missing events. Looks at gaps going forwards and backwards. The gap in front of the latest events is not considered a gap. @@ -1640,7 +1640,7 @@ async def event_next_to_gap(self, room_id: str, event_id: str) -> bool: Boolean indicating whether it's an extremity """ - def event_next_to_gap_txn(txn) -> bool: + def is_event_next_to_gap_txn(txn) -> bool: # If the event in question is listed as a backward extremity, it's # next to a gap. backward_extremity_query = """ @@ -1695,8 +1695,8 @@ def event_next_to_gap_txn(txn) -> bool: return False return await self.db_pool.runInteraction( - "event_next_to_gap_txn", - event_next_to_gap_txn, + "is_event_next_to_gap_txn", + is_event_next_to_gap_txn, ) async def get_event_for_timestamp( From 2a5b622cf0e7a8f39b1806d38cc5e17bc05bdcad Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 16:45:24 -0600 Subject: [PATCH 29/51] Remove extra space typo --- synapse/storage/databases/main/events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 64dd1a6e5fae..25d5342d13b2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1628,7 +1628,7 @@ def _cleanup_old_transaction_ids_txn(txn): ) async def is_event_next_to_gap(self, room_id: str, event_id: str) -> bool: - """Check if the given `event_id` is next to a gap of missing events. + """Check if the given `event_id` is next to a gap of missing events. Looks at gaps going forwards and backwards. The gap in front of the latest events is not considered a gap. From 0610fac5dfecf5bf59e8b22e754be719fd74fe09 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 16:47:07 -0600 Subject: [PATCH 30/51] Include inclusive comment on all comment docs --- synapse/federation/federation_server.py | 4 ++-- synapse/federation/transport/client.py | 4 ++-- synapse/handlers/room.py | 4 ++-- synapse/storage/databases/main/events_worker.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index edb574ced8b5..ae437cc44e78 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -210,8 +210,8 @@ async def on_timestamp_to_event_request( Args: origin: The server we received the event from room_id: Room to fetch the event from - timestamp: The point in time we should navigate from in the given - direction to find the closest event. + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. direction: ["f"|"b"] to indicate whether we should navigate forward or backward from the given timestamp to find the closest event. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d2dd26b75903..fda3286d864a 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -159,8 +159,8 @@ async def timestamp_to_event( Args: destination: Domain name of the remote homeserver room_id: Room to fetch the event from - timestamp: The point in time we should navigate from in the given - direction to find the closest event. + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. direction: ["f"|"b"] to indicate whether we should navigate forward or backward from the given timestamp to find the closest event. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7034945c8db9..9c1fe29ddad2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1242,8 +1242,8 @@ async def get_event_for_timestamp( Args: requester: The user making the request according to the access token room_id: Room to fetch the event from - timestamp: The point in time we should navigate from in the given - direction to find the closest event. + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. direction: ["f"|"b"] to indicate whether we should navigate forward or backward from the given timestamp to find the closest event. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 25d5342d13b2..e1202a45f692 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1707,7 +1707,7 @@ async def get_event_for_timestamp( Args: room_id: Room to fetch the event from timestamp: The point in time (inclusive) we should navigate from in - the given direction to find the closest event. + the given direction to find the closest event. direction: ["f"|"b"] to indicate whether we should navigate forward or backward from the given timestamp to find the closest event. From 183e1bfffbacfe8ae841625c40ba6bb8450f373c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 16:49:07 -0600 Subject: [PATCH 31/51] Remove redundant continues --- synapse/handlers/room.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9c1fe29ddad2..dc39e1f2ab86 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1307,8 +1307,6 @@ async def get_event_for_timestamp( # `get_event_for_timestamp` function again to make sure # they didn't give us an event from their gappy history. return remote_event_id - - continue except Exception as ex: logger.debug( "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", @@ -1317,7 +1315,6 @@ async def get_event_for_timestamp( ex, ex.args, ) - continue if not local_event_id: raise SynapseError( From 5362bd36e4d4f0facf7d17e8c12a6d4cd4d3e13e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 23:07:36 -0600 Subject: [PATCH 32/51] Fix query rejections and edge cases See - https://github.com/matrix-org/synapse/pull/9445#discussion_r757090973 - https://github.com/matrix-org/synapse/pull/9445#discussion_r757092652 This commit does make one of the federation Complement tests fail but I think it's because our backward extremity query is weird and doesn't check the event in questions prev_events. Currently we only check the event itself which won't be a backward extremity because we already have the event and set the prev_events as the new backward extremities. --- .../storage/databases/main/events_worker.py | 56 ++++++++++++++----- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e1202a45f692..0e0b3f11d54f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1645,12 +1645,26 @@ def is_event_next_to_gap_txn(txn) -> bool: # next to a gap. backward_extremity_query = """ SELECT 1 FROM event_backward_extremities - LEFT JOIN rejections USING (event_id) WHERE room_id = ? AND event_id = ? - /* Make sure event is not rejected */ - AND rejections.event_id IS NULL + LIMIT 1 + """ + + # If the event in question is a forward extremity, we will just + # consider any potential forward gap as not a gap since it's one of + # the latest events in the room. + # + # We can't combine this query with the `forward_edge_query` below + # because if the event in question has no forward edges(isn't + # referenced by any other event's prev_events) but is in + # `event_forward_extremities`, we don't want to return 0 rows and + # say it's next to a gap. + forward_extremity_query = """ + SELECT 1 FROM event_forward_extremities + WHERE + room_id = ? + AND event_id = ? LIMIT 1 """ @@ -1658,24 +1672,18 @@ def is_event_next_to_gap_txn(txn) -> bool: # by another event. If we don't see any edges, we're next to a # forward gap. # - # We can't just check `event_forward_extremities` directly because that - # doesn't include potential backfilled and outlier events. - # - # We consider any event that is an `event_forward_extremities` as - # the latest in the room and not next to a gap. + # We can't just only check `event_forward_extremities` directly + # because that doesn't include backfilled and outlier events. forward_edge_query = """ SELECT 1 FROM event_edges - LEFT JOIN rejections ON event_edges.prev_event_id == rejections.event_id - LEFT JOIN event_forward_extremities ON event_edges.prev_event_id == event_forward_extremities.event_id + /* Check to make sure the event referencing our event in question is not rejected */ + LEFT JOIN rejections ON event_edges.event_id == rejections.event_id WHERE event_edges.room_id = ? AND event_edges.prev_event_id = ? - /* If the event in question is a forward extremity, we will - * just consider the forward gap as not a gap since it's one - * of the latest events in the room. + /* It's not a valid edge if the event referencing our event in + * question is rejected. */ - AND event_forward_extremities.event_id IS NOT NULL - /* Make sure event is not rejected */ AND rejections.event_id IS NULL LIMIT 1 """ @@ -1683,12 +1691,30 @@ def is_event_next_to_gap_txn(txn) -> bool: txn.execute(backward_extremity_query, (room_id, event_id)) backward_extremities = txn.fetchall() + # We consider any backward extremity as a backwards gap if len(backward_extremities): return True + txn.execute(forward_extremity_query, (room_id, event_id)) + forward_extremities = txn.fetchall() + + # We consider any forward extremity as the latest in the room and + # not a gap. + # + # To expand, even though there is technically a gap at the front of + # the room where the forward extremities are, we consider those the + # latest messages in the room so asking other homeservers for more + # is useless. The new latest messages will just be federated as + # usual. + if len(forward_extremities): + return False + txn.execute(forward_edge_query, (room_id, event_id)) forward_edges = txn.fetchall() + # If there are no forward edges to the event in question (another + # event hasn't referenced this event in their prev_events), then we + # assume there is a gap in the history. if not len(forward_edges): return True From 76ac526ae191a88e6cd8100bc21eab106a4d3b35 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 23:11:02 -0600 Subject: [PATCH 33/51] Update function name to portray what it accepts, event_id --- synapse/handlers/room.py | 4 ++-- synapse/storage/databases/main/events_worker.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dc39e1f2ab86..96f849c16a52 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1267,11 +1267,11 @@ async def get_event_for_timestamp( # If we found a gap, we should probably ask another homeserver first # about more history in between if local_event_id: - is_event_next_to_gap = await self.store.is_event_next_to_gap( + is_event_id_next_to_gap = await self.store.is_event_id_next_to_gap( room_id, local_event_id ) - if not local_event_id or is_event_next_to_gap: + if not local_event_id or is_event_id_next_to_gap: logger.debug( "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", local_event_id, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0e0b3f11d54f..a583537290b5 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,7 +1627,7 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def is_event_next_to_gap(self, room_id: str, event_id: str) -> bool: + async def is_event_id_next_to_gap(self, room_id: str, event_id: str) -> bool: """Check if the given `event_id` is next to a gap of missing events. Looks at gaps going forwards and backwards. The gap in front of the latest events is not considered a gap. @@ -1640,7 +1640,7 @@ async def is_event_next_to_gap(self, room_id: str, event_id: str) -> bool: Boolean indicating whether it's an extremity """ - def is_event_next_to_gap_txn(txn) -> bool: + def is_event_id_next_to_gap_txn(txn) -> bool: # If the event in question is listed as a backward extremity, it's # next to a gap. backward_extremity_query = """ @@ -1721,8 +1721,8 @@ def is_event_next_to_gap_txn(txn) -> bool: return False return await self.db_pool.runInteraction( - "is_event_next_to_gap_txn", - is_event_next_to_gap_txn, + "is_event_id_next_to_gap_txn", + is_event_id_next_to_gap_txn, ) async def get_event_for_timestamp( From 58d67f2a5a0a3f74b7ad72be8aa611e1fc2cc21a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 29 Nov 2021 23:51:15 -0600 Subject: [PATCH 34/51] Fix backward gap detection There was a flaw in my logic before where it checked the event itself as a backward extremity instead of the events prev_events. --- synapse/federation/federation_server.py | 8 +-- synapse/handlers/room.py | 10 ++-- .../storage/databases/main/events_worker.py | 59 +++++++++++-------- 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ae437cc44e78..7c035c8f5e78 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -224,16 +224,12 @@ async def on_timestamp_to_event_request( await self.check_server_matches_acl(origin_host, room_id) # We only try to fetch data from the local database - event_id = await self.store.get_event_for_timestamp( + event_id = await self.store.get_event_id_for_timestamp( room_id, timestamp, direction ) if event_id: - event = await self.store.get_event( - event_id, allow_none=False, allow_rejected=False - ) - return 200, { - "event_id": event.event_id, + "event_id": event_id, } raise SynapseError( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 96f849c16a52..810a32581499 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1254,10 +1254,9 @@ async def get_event_for_timestamp( SynapseError if unable to find any event locally in the given direction """ - local_event_id = await self.store.get_event_for_timestamp( + local_event_id = await self.store.get_event_id_for_timestamp( room_id, timestamp, direction ) - logger.debug( "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s", local_event_id, @@ -1267,11 +1266,12 @@ async def get_event_for_timestamp( # If we found a gap, we should probably ask another homeserver first # about more history in between if local_event_id: - is_event_id_next_to_gap = await self.store.is_event_id_next_to_gap( - room_id, local_event_id + local_event = await self.store.get_event( + local_event_id, allow_none=False, allow_rejected=False ) + is_event_next_to_gap = await self.store.is_event_next_to_gap(local_event) - if not local_event_id or is_event_id_next_to_gap: + if not local_event_id or is_event_next_to_gap: logger.debug( "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", local_event_id, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a583537290b5..cafdc4244844 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,8 +1627,8 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def is_event_id_next_to_gap(self, room_id: str, event_id: str) -> bool: - """Check if the given `event_id` is next to a gap of missing events. + async def is_event_next_to_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a gap of missing events. Looks at gaps going forwards and backwards. The gap in front of the latest events is not considered a gap. @@ -1640,9 +1640,15 @@ async def is_event_id_next_to_gap(self, room_id: str, event_id: str) -> bool: Boolean indicating whether it's an extremity """ - def is_event_id_next_to_gap_txn(txn) -> bool: - # If the event in question is listed as a backward extremity, it's - # next to a gap. + def is_event_next_to_gap_txn(txn) -> bool: + # If the event in question has its prev_events listed as a backward + # extremity, it's next to a gap. + # + # We can't just check the backward edges in `event_edges` because + # when we persist event, we will also record the prev_events as + # edges to the event in question regardless of if we have those + # prev_events yet. We need to check whether those prev_events are + # backward extremity also known as gaps that need to be backfilled. backward_extremity_query = """ SELECT 1 FROM event_backward_extremities WHERE @@ -1688,44 +1694,49 @@ def is_event_id_next_to_gap_txn(txn) -> bool: LIMIT 1 """ - txn.execute(backward_extremity_query, (room_id, event_id)) - backward_extremities = txn.fetchall() - - # We consider any backward extremity as a backwards gap - if len(backward_extremities): - return True + # If the event in question has its prev_events listed as a backward + # extremity, it's next to a backwards gap. + # + # We need to check this before the forward edges below as the event + # in question could be a forward extremity while still having a + # backwards gap. + for prev_event_id in event.prev_event_ids(): + txn.execute(backward_extremity_query, (event.room_id, prev_event_id)) + backward_extremities = txn.fetchall() - txn.execute(forward_extremity_query, (room_id, event_id)) - forward_extremities = txn.fetchall() + # We consider any backward extremity as a backwards gap + if len(backward_extremities): + return True # We consider any forward extremity as the latest in the room and - # not a gap. + # not a forward gap. # # To expand, even though there is technically a gap at the front of # the room where the forward extremities are, we consider those the # latest messages in the room so asking other homeservers for more # is useless. The new latest messages will just be federated as # usual. + txn.execute(forward_extremity_query, (event.room_id, event.event_id)) + forward_extremities = txn.fetchall() if len(forward_extremities): return False - txn.execute(forward_edge_query, (room_id, event_id)) - forward_edges = txn.fetchall() - # If there are no forward edges to the event in question (another # event hasn't referenced this event in their prev_events), then we - # assume there is a gap in the history. + # assume there is a forward gap in the history. + txn.execute(forward_edge_query, (event.room_id, event.event_id)) + forward_edges = txn.fetchall() if not len(forward_edges): return True return False return await self.db_pool.runInteraction( - "is_event_id_next_to_gap_txn", - is_event_id_next_to_gap_txn, + "is_event_next_to_gap_txn", + is_event_next_to_gap_txn, ) - async def get_event_for_timestamp( + async def get_event_id_for_timestamp( self, room_id: str, timestamp: int, direction: str ) -> Optional[str]: """Find the closest event to the given timestamp in the given direction. @@ -1754,7 +1765,7 @@ async def get_event_for_timestamp( LIMIT 1; """ - def get_event_for_timestamp_txn(txn) -> str: + def get_event_id_for_timestamp_txn(txn) -> str: if direction == "b": # Find closest event *before* a given timestamp. We use descending # (which gives values largest to smallest) because we want the @@ -1782,6 +1793,6 @@ def get_event_for_timestamp_txn(txn) -> str: raise ValueError("Unknown direction: %s" % (direction,)) return await self.db_pool.runInteraction( - "get_event_for_timestamp_txn", - get_event_for_timestamp_txn, + "get_event_id_for_timestamp_txn", + get_event_id_for_timestamp_txn, ) From 63d61fc8db739c71fdc7d3327035823024ecefc6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 00:47:04 -0600 Subject: [PATCH 35/51] Type the transport client for /timestamp_to_event requests See https://github.com/matrix-org/synapse/pull/9445#discussion_r756048845 --- synapse/federation/transport/client.py | 19 +++++++++++++++++-- synapse/handlers/room.py | 19 ++++++++----------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index fda3286d864a..a775af0dbab8 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -27,10 +27,12 @@ Optional, Tuple, Union, + cast, ) import attr import ijson +from typing_extensions import TypedDict from synapse.api.constants import Membership from synapse.api.errors import Codes, HttpResponseException, SynapseError @@ -54,6 +56,12 @@ MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024 +class TimestampToEventResponse(TypedDict): + """Typed response dictionary for the federation /timestamp_to_event endpoint""" + + event_id: str + + class TransportLayerClient: """Sends federation HTTP requests to other servers""" @@ -151,7 +159,7 @@ async def backfill( @log_function async def timestamp_to_event( self, destination: str, room_id: str, timestamp: int, direction: str - ) -> Optional[JsonDict]: + ) -> TimestampToEventResponse: """ Calls a remote federating server at `destination` asking for their closest event to the given timestamp in the given direction. @@ -167,6 +175,9 @@ async def timestamp_to_event( Returns: Results in a dict received from the remote homeserver. Expected response will include the `event_id` key for the closest event. + + Raises: + Various exceptions when the request fails """ path = _create_path( FEDERATION_UNSTABLE_PREFIX, @@ -176,9 +187,13 @@ async def timestamp_to_event( args = {"ts": [str(timestamp)], "dir": [direction]} - return await self.client.get_json( + remote_response = await self.client.get_json( destination, path=path, args=args, try_trailing_slash_on_400=True ) + assert isinstance(remote_response, dict) + assert remote_response.get("event_id", None) + + return cast(TimestampToEventResponse, remote_response) @log_function async def send_transaction( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 810a32581499..34b47cd7cc36 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1296,17 +1296,14 @@ async def get_event_for_timestamp( domain, remote_response, ) - if not remote_response: - continue - - remote_event_id = remote_response.get("event_id", None) - if remote_event_id: - # TODO: Do we want to persist this as an extremity? - # TODO: I think ideally, we would try to backfill from - # this event and run this whole - # `get_event_for_timestamp` function again to make sure - # they didn't give us an event from their gappy history. - return remote_event_id + + # TODO: Do we want to persist this as an extremity? + # TODO: I think ideally, we would try to backfill from + # this event and run this whole + # `get_event_for_timestamp` function again to make sure + # they didn't give us an event from their gappy history. + remote_event_id = remote_response["event_id"] + return remote_event_id except Exception as ex: logger.debug( "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", From 70420e5079ed12cc703ccac7dd8c9029e15a6786 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 01:24:29 -0600 Subject: [PATCH 36/51] Optimize when we ask other federated homeservers See https://github.com/matrix-org/synapse/pull/9445#discussion_r756063000 --- synapse/handlers/room.py | 22 +++++++- .../storage/databases/main/events_worker.py | 56 ++++++++++++------- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 34b47cd7cc36..d1070a190818 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1265,13 +1265,31 @@ async def get_event_for_timestamp( # If we found a gap, we should probably ask another homeserver first # about more history in between + is_event_next_to_backward_gap = False + is_event_next_to_forward_gap = False if local_event_id: local_event = await self.store.get_event( local_event_id, allow_none=False, allow_rejected=False ) - is_event_next_to_gap = await self.store.is_event_next_to_gap(local_event) - if not local_event_id or is_event_next_to_gap: + if direction == "f": + # We only need to check for a backward gap if we're looking forwards + # to ensure there is nothing in between. + is_event_next_to_backward_gap = ( + await self.store.is_event_next_to_backward_gap(local_event) + ) + elif direction == "b": + # We only need to check for a forward gap if we're looking backwards + # to ensure there is nothing in between + is_event_next_to_forward_gap = ( + await self.store.is_event_next_to_forward_gap(local_event) + ) + + if ( + not local_event_id + or is_event_next_to_backward_gap + or is_event_next_to_forward_gap + ): logger.debug( "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first", local_event_id, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cafdc4244844..8996fa6fddda 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1627,10 +1627,9 @@ def _cleanup_old_transaction_ids_txn(txn): _cleanup_old_transaction_ids_txn, ) - async def is_event_next_to_gap(self, event: EventBase) -> bool: - """Check if the given event is next to a gap of missing events. - Looks at gaps going forwards and backwards. The gap in front of the - latest events is not considered a gap. + async def is_event_next_to_backward_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a backward gap of missing events. + A(False)--->B(False)--->C(True)---> Args: room_id: room where the event lives @@ -1640,7 +1639,7 @@ async def is_event_next_to_gap(self, event: EventBase) -> bool: Boolean indicating whether it's an extremity """ - def is_event_next_to_gap_txn(txn) -> bool: + def is_event_next_to_backward_gap_txn(txn) -> bool: # If the event in question has its prev_events listed as a backward # extremity, it's next to a gap. # @@ -1657,6 +1656,38 @@ def is_event_next_to_gap_txn(txn) -> bool: LIMIT 1 """ + # If the event in question has its prev_events listed as a backward + # extremity, it's next to a backwards gap. + for prev_event_id in event.prev_event_ids(): + txn.execute(backward_extremity_query, (event.room_id, prev_event_id)) + backward_extremities = txn.fetchall() + + # We consider any backward extremity as a backwards gap + if len(backward_extremities): + return True + + return False + + return await self.db_pool.runInteraction( + "is_event_next_to_backward_gap_txn", + is_event_next_to_backward_gap_txn, + ) + + async def is_event_next_to_forward_gap(self, event: EventBase) -> bool: + """Check if the given event is next to a forward gap of missing events. + The gap in front of the latest events is not considered a gap. + A(False)--->B(False)--->C(False)---> + A(False)--->B(False)---> --->D(True)--->E(False) + + Args: + room_id: room where the event lives + event_id: event to check + + Returns: + Boolean indicating whether it's an extremity + """ + + def is_event_next_to_gap_txn(txn) -> bool: # If the event in question is a forward extremity, we will just # consider any potential forward gap as not a gap since it's one of # the latest events in the room. @@ -1693,21 +1724,6 @@ def is_event_next_to_gap_txn(txn) -> bool: AND rejections.event_id IS NULL LIMIT 1 """ - - # If the event in question has its prev_events listed as a backward - # extremity, it's next to a backwards gap. - # - # We need to check this before the forward edges below as the event - # in question could be a forward extremity while still having a - # backwards gap. - for prev_event_id in event.prev_event_ids(): - txn.execute(backward_extremity_query, (event.room_id, prev_event_id)) - backward_extremities = txn.fetchall() - - # We consider any backward extremity as a backwards gap - if len(backward_extremities): - return True - # We consider any forward extremity as the latest in the room and # not a forward gap. # From a8644b998b5ec19d88e67c8877015cfbd1670124 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 01:49:00 -0600 Subject: [PATCH 37/51] Only return remote event when closer See https://github.com/matrix-org/synapse/pull/9445#discussion_r757098009 --- synapse/federation/federation_server.py | 5 +++++ synapse/federation/transport/client.py | 1 + synapse/handlers/room.py | 10 +++++++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7c035c8f5e78..02c5b75c7083 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -228,8 +228,13 @@ async def on_timestamp_to_event_request( room_id, timestamp, direction ) if event_id: + event = await self.store.get_event( + event_id, allow_none=False, allow_rejected=False + ) + return 200, { "event_id": event_id, + "origin_server_ts": event.origin_server_ts, } raise SynapseError( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index a775af0dbab8..d91f94f89e18 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -60,6 +60,7 @@ class TimestampToEventResponse(TypedDict): """Typed response dictionary for the federation /timestamp_to_event endpoint""" event_id: str + origin_server_ts: int class TransportLayerClient: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d1070a190818..f337bb047ec2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1321,7 +1321,15 @@ async def get_event_for_timestamp( # `get_event_for_timestamp` function again to make sure # they didn't give us an event from their gappy history. remote_event_id = remote_response["event_id"] - return remote_event_id + origin_server_ts = remote_response["origin_server_ts"] + + # Only return the remote event if it's closer than the local event + if not local_event or ( + local_event + and abs(origin_server_ts - timestamp) + < abs(local_event.origin_server_ts - timestamp) + ): + return remote_event_id except Exception as ex: logger.debug( "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", From c38984cf7656358748637224a3b74c7b18261b5a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 01:59:05 -0600 Subject: [PATCH 38/51] Add origin_server_ts to client endpoint Added it to the client endpoint as well because it could be useful to make sure that a requested event is not 24 hours older than the timestamp requested for day-by-day archive for example. --- synapse/handlers/room.py | 9 +++++---- synapse/rest/client/room.py | 6 +++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f337bb047ec2..4f7bd6bf4ed0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1234,7 +1234,7 @@ async def get_event_for_timestamp( room_id: str, timestamp: int, direction: str, - ) -> str: + ) -> Tuple[str, int]: """Find the closest event to the given timestamp in the given direction. If we can't find an event locally or the event we have locally is next to a gap, it will ask other federated homeservers for an event. @@ -1248,7 +1248,8 @@ async def get_event_for_timestamp( or backward from the given timestamp to find the closest event. Returns: - `event_id` closest to the given timestamp in the given direction + A tuple containing the `event_id` closest to the given timestamp in + the given direction and the `origin_server_ts`. Raises: SynapseError if unable to find any event locally in the given direction @@ -1329,7 +1330,7 @@ async def get_event_for_timestamp( and abs(origin_server_ts - timestamp) < abs(local_event.origin_server_ts - timestamp) ): - return remote_event_id + return remote_event_id, origin_server_ts except Exception as ex: logger.debug( "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", @@ -1346,7 +1347,7 @@ async def get_event_for_timestamp( errcode=Codes.NOT_FOUND, ) - return local_event_id + return local_event_id, local_event.origin_server_ts class RoomEventSource(EventSource[RoomStreamToken, EventBase]): diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index f82197d59ced..af6108ded1e2 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1113,12 +1113,16 @@ async def on_GET( timestamp = parse_integer(request, "ts", required=True) direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) - event_id = await self.timestamp_lookup_handler.get_event_for_timestamp( + ( + event_id, + origin_server_ts, + ) = await self.timestamp_lookup_handler.get_event_for_timestamp( requester, room_id, timestamp, direction ) return 200, { "event_id": event_id, + "origin_server_ts": origin_server_ts, } From ed1360a6f3bf7cfcb310d620f82c87179cad3270 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 02:06:09 -0600 Subject: [PATCH 39/51] Make asserts more obvious --- synapse/federation/transport/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d91f94f89e18..351aaee97bf7 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -192,7 +192,8 @@ async def timestamp_to_event( destination, path=path, args=args, try_trailing_slash_on_400=True ) assert isinstance(remote_response, dict) - assert remote_response.get("event_id", None) + assert remote_response.get("event_id", None) is not None + assert remote_response.get("origin_server_ts", None) is not None return cast(TimestampToEventResponse, remote_response) From 13371a6ed3812432ca67afe067915bee83c9bf26 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 02:08:38 -0600 Subject: [PATCH 40/51] Better comments --- synapse/handlers/room.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4f7bd6bf4ed0..e6b189629f0a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1264,8 +1264,8 @@ async def get_event_for_timestamp( timestamp, ) - # If we found a gap, we should probably ask another homeserver first - # about more history in between + # Check for gaps in the history where events could be hiding in between + # the timestamp given and the event we were able to find localy is_event_next_to_backward_gap = False is_event_next_to_forward_gap = False if local_event_id: @@ -1286,6 +1286,8 @@ async def get_event_for_timestamp( await self.store.is_event_next_to_forward_gap(local_event) ) + # If we found a gap, we should probably ask another homeserver first + # about more history in between if ( not local_event_id or is_event_next_to_backward_gap From d137292c6c3cf5b47e58272dac52a7a9dc68c020 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 14:50:36 -0600 Subject: [PATCH 41/51] Also check for the event itself as a backward extremity See https://github.com/matrix-org/synapse/pull/9445#discussion_r759642569 This additional check for the event itself as a backward extremity is necessary because it's probably possible to have an outlier event with no prev_events as backward extremities. --- .../storage/databases/main/events_worker.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 8996fa6fddda..1fac222e8831 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1640,14 +1640,15 @@ async def is_event_next_to_backward_gap(self, event: EventBase) -> bool: """ def is_event_next_to_backward_gap_txn(txn) -> bool: - # If the event in question has its prev_events listed as a backward - # extremity, it's next to a gap. + # If the event in question has any of its prev_events listed as a + # backward extremity, it's next to a gap. # # We can't just check the backward edges in `event_edges` because - # when we persist event, we will also record the prev_events as - # edges to the event in question regardless of if we have those + # when we persist events, we will also record the prev_events as + # edges to the event in question regardless of whether we have those # prev_events yet. We need to check whether those prev_events are - # backward extremity also known as gaps that need to be backfilled. + # backward extremities, also known as gaps, that need to be + # backfilled. backward_extremity_query = """ SELECT 1 FROM event_backward_extremities WHERE @@ -1656,13 +1657,14 @@ def is_event_next_to_backward_gap_txn(txn) -> bool: LIMIT 1 """ - # If the event in question has its prev_events listed as a backward - # extremity, it's next to a backwards gap. - for prev_event_id in event.prev_event_ids(): + # If the event in question is a backward extremity or has any of its + # prev_events listed as a backward extremity, it's next to a + # backward gap. + for prev_event_id in [event.event_id] + event.prev_event_ids(): txn.execute(backward_extremity_query, (event.room_id, prev_event_id)) backward_extremities = txn.fetchall() - # We consider any backward extremity as a backwards gap + # We consider any backward extremity as a backward gap if len(backward_extremities): return True From 662366a354d793d971fb914bb1ec3a51425599f0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 14:55:14 -0600 Subject: [PATCH 42/51] Remove redundant NPE check See https://github.com/matrix-org/synapse/pull/9445#discussion_r759322954 --- synapse/handlers/room.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e6b189629f0a..b91e7e19b8d1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1328,8 +1328,7 @@ async def get_event_for_timestamp( # Only return the remote event if it's closer than the local event if not local_event or ( - local_event - and abs(origin_server_ts - timestamp) + abs(origin_server_ts - timestamp) < abs(local_event.origin_server_ts - timestamp) ): return remote_event_id, origin_server_ts From dd7e689a143c6ebe21725cb57c56b2955fd0092a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 15:00:05 -0600 Subject: [PATCH 43/51] Make more comment clear on why not and what we can use it to rely on See https://github.com/matrix-org/synapse/pull/9445#discussion_r759412698 --- synapse/storage/databases/main/events_worker.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 1fac222e8831..affc367281ef 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1694,8 +1694,12 @@ def is_event_next_to_gap_txn(txn) -> bool: # consider any potential forward gap as not a gap since it's one of # the latest events in the room. # + # `event_forward_extremities` does not include backfilled or outlier + # events so we can't rely on it to find forward gaps. We can only + # use it to determine whether a message is the latest in the room. + # # We can't combine this query with the `forward_edge_query` below - # because if the event in question has no forward edges(isn't + # because if the event in question has no forward edges (isn't # referenced by any other event's prev_events) but is in # `event_forward_extremities`, we don't want to return 0 rows and # say it's next to a gap. @@ -1710,9 +1714,6 @@ def is_event_next_to_gap_txn(txn) -> bool: # Check to see whether the event in question is already referenced # by another event. If we don't see any edges, we're next to a # forward gap. - # - # We can't just only check `event_forward_extremities` directly - # because that doesn't include backfilled and outlier events. forward_edge_query = """ SELECT 1 FROM event_edges /* Check to make sure the event referencing our event in question is not rejected */ @@ -1726,6 +1727,7 @@ def is_event_next_to_gap_txn(txn) -> bool: AND rejections.event_id IS NULL LIMIT 1 """ + # We consider any forward extremity as the latest in the room and # not a forward gap. # From e7d212052823e8d4ce3332d148e1b047fc098398 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 15:01:00 -0600 Subject: [PATCH 44/51] Fix typo --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b91e7e19b8d1..6b6929bc08e6 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1265,7 +1265,7 @@ async def get_event_for_timestamp( ) # Check for gaps in the history where events could be hiding in between - # the timestamp given and the event we were able to find localy + # the timestamp given and the event we were able to find locally is_event_next_to_backward_gap = False is_event_next_to_forward_gap = False if local_event_id: From 5660fde74426ad60dfcaf1c8460286bcba4ab94b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 17:40:32 -0600 Subject: [PATCH 45/51] Use federation_client and better validation See https://github.com/matrix-org/synapse/pull/9445#discussion_r759316539 --- synapse/federation/federation_client.py | 77 +++++++++++++++++++++++++ synapse/federation/transport/client.py | 18 +----- synapse/handlers/room.py | 8 +-- 3 files changed, 84 insertions(+), 19 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 3b85b135e0d3..2721cd1ee7a5 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1496,6 +1496,83 @@ async def send_request( self._get_room_hierarchy_cache[(room_id, suggested_only)] = result return result + async def timestamp_to_event( + self, destination: str, room_id: str, timestamp: int, direction: str + ) -> "TimestampToEventResponse": + """ + Calls a remote federating server at `destination` asking for their + closest event to the given timestamp in the given direction. Also + validates the response to always return the expected keys or raises an + error. + + Args: + destination: Domain name of the remote homeserver + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A parsed TimestampToEventResponse including the closest event_id + and origin_server_ts + + Raises: + Various exceptions when the request fails + InvalidResponseError when the response does not have the correct + keys or wrong types + """ + remote_response = await self.transport_layer.timestamp_to_event( + destination, room_id, timestamp, direction + ) + + if not isinstance(remote_response, dict): + raise InvalidResponseError( + "Response must be a JSON dictionary but received %s" % remote_response + ) + + try: + return TimestampToEventResponse.from_json_dict(remote_response) + except ValueError as e: + raise InvalidResponseError(str(e)) + + +@attr.s(frozen=True, slots=True, auto_attribs=True) +class TimestampToEventResponse: + """Typed response dictionary for the federation /timestamp_to_event endpoint""" + + event_id: str + origin_server_ts: int + + # the raw data, including the above keys + data: JsonDict + + @classmethod + def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult": + """Parsed response from the federation /timestamp_to_event endpoint + + Args: + d: JSON object response to be parsed + + Raises: + ValueError if d does not the correct keys or they are the wrong types + """ + + event_id = d.get("event_id") + if not isinstance(event_id, str): + raise ValueError( + "Invalid response: 'event_id' must be a str but received %s" % event_id + ) + + origin_server_ts = d.get("origin_server_ts") + if not isinstance(origin_server_ts, int): + raise ValueError( + "Invalid response: 'origin_server_ts' must be a int but received %s" + % origin_server_ts + ) + + return cls(event_id, origin_server_ts, d) + @attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryEventResult: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 351aaee97bf7..bfa2f0bd68b3 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -32,7 +32,6 @@ import attr import ijson -from typing_extensions import TypedDict from synapse.api.constants import Membership from synapse.api.errors import Codes, HttpResponseException, SynapseError @@ -56,13 +55,6 @@ MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024 -class TimestampToEventResponse(TypedDict): - """Typed response dictionary for the federation /timestamp_to_event endpoint""" - - event_id: str - origin_server_ts: int - - class TransportLayerClient: """Sends federation HTTP requests to other servers""" @@ -160,7 +152,7 @@ async def backfill( @log_function async def timestamp_to_event( self, destination: str, room_id: str, timestamp: int, direction: str - ) -> TimestampToEventResponse: + ) -> Union[JsonDict, List]: """ Calls a remote federating server at `destination` asking for their closest event to the given timestamp in the given direction. @@ -174,8 +166,7 @@ async def timestamp_to_event( or backward from the given timestamp to find the closest event. Returns: - Results in a dict received from the remote homeserver. - Expected response will include the `event_id` key for the closest event. + Response dict received from the remote homeserver. Raises: Various exceptions when the request fails @@ -191,11 +182,8 @@ async def timestamp_to_event( remote_response = await self.client.get_json( destination, path=path, args=args, try_trailing_slash_on_400=True ) - assert isinstance(remote_response, dict) - assert remote_response.get("event_id", None) is not None - assert remote_response.get("origin_server_ts", None) is not None - return cast(TimestampToEventResponse, remote_response) + return remote_response @log_function async def send_transaction( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6b6929bc08e6..9219d6fb492f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1226,7 +1226,7 @@ def __init__(self, hs: "HomeServer"): self.server_name = hs.hostname self.store = hs.get_datastore() self.state_handler = hs.get_state_handler() - self.transport_layer = hs.get_federation_transport_client() + self.federation_client = hs.get_federation_client() async def get_event_for_timestamp( self, @@ -1309,7 +1309,7 @@ async def get_event_for_timestamp( # Loop through each homeserver candidate until we get a succesful response for domain in likely_domains: try: - remote_response = await self.transport_layer.timestamp_to_event( + remote_response = await self.federation_client.timestamp_to_event( domain, room_id, timestamp, direction ) logger.debug( @@ -1323,8 +1323,8 @@ async def get_event_for_timestamp( # this event and run this whole # `get_event_for_timestamp` function again to make sure # they didn't give us an event from their gappy history. - remote_event_id = remote_response["event_id"] - origin_server_ts = remote_response["origin_server_ts"] + remote_event_id = remote_response.event_id + origin_server_ts = remote_response.origin_server_ts # Only return the remote event if it's closer than the local event if not local_event or ( From c3c404baa30317d036f25ab3e330da4c44bb83de Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 20:19:20 -0600 Subject: [PATCH 46/51] Louder warnings about real problems --- synapse/handlers/room.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9219d6fb492f..2bcdf32dcce1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -46,6 +46,7 @@ from synapse.api.errors import ( AuthError, Codes, + HttpResponseException, LimitExceededError, NotFoundError, StoreError, @@ -56,6 +57,7 @@ from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase from synapse.events.utils import copy_power_levels_contents +from synapse.federation.federation_client import InvalidResponseError from synapse.handlers.federation import get_domains_from_state from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter @@ -1332,7 +1334,9 @@ async def get_event_for_timestamp( < abs(local_event.origin_server_ts - timestamp) ): return remote_event_id, origin_server_ts - except Exception as ex: + except (HttpResponseException, InvalidResponseError) as ex: + # Let's not put a high priority on some other homeserver + # failing to respond or giving a random response logger.debug( "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", domain, @@ -1340,6 +1344,15 @@ async def get_event_for_timestamp( ex, ex.args, ) + except Exception as ex: + # But we do want to see some exceptions in our code + logger.warning( + "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", + domain, + type(ex).__name__, + ex, + ex.args, + ) if not local_event_id: raise SynapseError( From 1ff2db43ae6b7839a49e0ea19e7d397a75471f7c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 21:24:45 -0600 Subject: [PATCH 47/51] Optimize query to a single IN clause See https://github.com/matrix-org/synapse/pull/9445#discussion_r759356403 --- .../storage/databases/main/events_worker.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index affc367281ef..04f7f034c141 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1653,20 +1653,25 @@ def is_event_next_to_backward_gap_txn(txn) -> bool: SELECT 1 FROM event_backward_extremities WHERE room_id = ? - AND event_id = ? + AND %s LIMIT 1 """ # If the event in question is a backward extremity or has any of its # prev_events listed as a backward extremity, it's next to a # backward gap. - for prev_event_id in [event.event_id] + event.prev_event_ids(): - txn.execute(backward_extremity_query, (event.room_id, prev_event_id)) - backward_extremities = txn.fetchall() + clause, args = make_in_list_sql_clause( + self.database_engine, + "event_id", + [event.event_id] + event.prev_event_ids(), + ) - # We consider any backward extremity as a backward gap - if len(backward_extremities): - return True + txn.execute(backward_extremity_query % (clause,), [event.room_id] + args) + backward_extremities = txn.fetchall() + + # We consider any backward extremity as a backward gap + if len(backward_extremities): + return True return False From 5888eba757445db2cac71821c50a43f11c63d4ba Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 21:33:32 -0600 Subject: [PATCH 48/51] Fix lint --- synapse/federation/transport/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1722c1eedf2e..d1f4be641de6 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -27,7 +27,6 @@ Optional, Tuple, Union, - cast, ) import attr From 68704f482946f5e2e035cb4311d00a9277c48b9e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Nov 2021 22:37:03 -0600 Subject: [PATCH 49/51] Fix mypy lints --- synapse/federation/federation_client.py | 2 +- synapse/storage/databases/main/events_worker.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e21cb59089a9..767a8b79193e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1569,7 +1569,7 @@ class TimestampToEventResponse: data: JsonDict @classmethod - def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult": + def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse": """Parsed response from the federation /timestamp_to_event endpoint Args: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6f2f3f302455..fd19674f9385 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1775,7 +1775,7 @@ async def is_event_next_to_backward_gap(self, event: EventBase) -> bool: Boolean indicating whether it's an extremity """ - def is_event_next_to_backward_gap_txn(txn) -> bool: + def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool: # If the event in question has any of its prev_events listed as a # backward extremity, it's next to a gap. # @@ -1799,7 +1799,7 @@ def is_event_next_to_backward_gap_txn(txn) -> bool: clause, args = make_in_list_sql_clause( self.database_engine, "event_id", - [event.event_id] + event.prev_event_ids(), + [event.event_id] + list(event.prev_event_ids()), ) txn.execute(backward_extremity_query % (clause,), [event.room_id] + args) @@ -1830,7 +1830,7 @@ async def is_event_next_to_forward_gap(self, event: EventBase) -> bool: Boolean indicating whether it's an extremity """ - def is_event_next_to_gap_txn(txn) -> bool: + def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool: # If the event in question is a forward extremity, we will just # consider any potential forward gap as not a gap since it's one of # the latest events in the room. @@ -1926,7 +1926,7 @@ async def get_event_id_for_timestamp( LIMIT 1; """ - def get_event_id_for_timestamp_txn(txn) -> str: + def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]: if direction == "b": # Find closest event *before* a given timestamp. We use descending # (which gives values largest to smallest) because we want the From 3ee5d0c6031837e6500458122c8f71803e307ccb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Dec 2021 23:22:44 -0600 Subject: [PATCH 50/51] Use %r to protect from ascii junk See https://github.com/matrix-org/synapse/pull/9445#discussion_r760191269 This will print `\n` as `'\n'` instead of an actual newline for example --- synapse/federation/federation_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 767a8b79193e..be1423da248f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1549,7 +1549,7 @@ async def timestamp_to_event( if not isinstance(remote_response, dict): raise InvalidResponseError( - "Response must be a JSON dictionary but received %s" % remote_response + "Response must be a JSON dictionary but received %r" % remote_response ) try: @@ -1582,13 +1582,13 @@ def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse": event_id = d.get("event_id") if not isinstance(event_id, str): raise ValueError( - "Invalid response: 'event_id' must be a str but received %s" % event_id + "Invalid response: 'event_id' must be a str but received %r" % event_id ) origin_server_ts = d.get("origin_server_ts") if not isinstance(origin_server_ts, int): raise ValueError( - "Invalid response: 'origin_server_ts' must be a int but received %s" + "Invalid response: 'origin_server_ts' must be a int but received %r" % origin_server_ts ) From 2621e5da9a60d4931857c6e5c3b4bb8b63fa1f9b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Dec 2021 23:27:10 -0600 Subject: [PATCH 51/51] Remove MSC3030 complement test flag because unknown state of tests at time of merge Complement tests are in https://github.com/matrix-org/complement/pull/178 but haven't had any review and not sure when they will merge. --- scripts-dev/complement.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 35f90375e9d0..53295b58fca9 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2403,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... +go test -v -tags synapse_blacklist,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...