From 74a1a293b68424e139abcfc0eafdf8731de9d5d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 14 Dec 2020 14:04:54 +0000 Subject: [PATCH 1/8] Always persist state_events --- synapse/storage/databases/main/events.py | 46 ++++++++++++------------ 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 90fb1a1f004e..9e81d8779271 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -871,6 +871,29 @@ def event_dict(event): updatevalues={"have_censored": False}, ) + state_events_and_contexts = [ + ec for ec in events_and_contexts if ec[0].is_state() + ] + + state_values = [] + for event, context in state_events_and_contexts: + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + state_values.append(vals) + + self.db_pool.simple_insert_many_txn( + txn, table="state_events", values=state_values + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -987,29 +1010,6 @@ def _update_metadata_tables_txn( txn, [event for event, _ in events_and_contexts] ) - state_events_and_contexts = [ - ec for ec in events_and_contexts if ec[0].is_state() - ] - - state_values = [] - for event, context in state_events_and_contexts: - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - state_values.append(vals) - - self.db_pool.simple_insert_many_txn( - txn, table="state_events", values=state_values - ) - # Prefill the event cache self._add_to_cache(txn, events_and_contexts) From 083b39fdfe324711a1f76cdb789bce02d6490e5c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Jan 2021 17:15:41 +0000 Subject: [PATCH 2/8] Add background update to add rejected events to some metadata tables --- .../databases/main/events_bg_updates.py | 115 ++++++++++++++++++ .../delta/58/28rejected_events_metadata.sql | 17 +++ 2 files changed, 132 insertions(+) create mode 100644 synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 97b675484628..92d1ed906568 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -14,10 +14,15 @@ # limitations under the License. import logging +from typing import List, Tuple from synapse.api.constants import EventContentFields +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool +from synapse.storage.types import Cursor +from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -99,6 +104,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): columns=["user_id", "created_ts"], ) + self.db_pool.updates.register_background_update_handler( + "rejected_events_metadata", self._rejected_events_metadata, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -582,3 +591,109 @@ def _event_store_labels_txn(txn): await self.db_pool.updates._end_background_update("event_store_labels") return num_rows + + async def _rejected_events_metadata(self, progress: dict, batch_size: int) -> int: + """Adds rejected events to the `state_events` and `event_auth` metadata + tables. + """ + + last_event_id = progress.get("last_event_id", "") + + def get_rejected_events( + txn: Cursor, + ) -> List[Tuple[str, str, JsonDict, bool, bool]]: + # Fetch rejected event json, their room version and whether we have + # inserted them into the state_events or auth_events tables. + # + # Note we can assume that events that don't have a corresponding + # room version are V1 rooms. + sql = """ + SELECT DISTINCT + event_id, + COALESCE(room_version, '1'), + json, + state_events.event_id IS NOT NULL, + event_auth.event_id IS NOT NULL + FROM rejections + INNER JOIN event_json USING (event_id) + LEFT JOIN rooms USING (room_id) + LEFT JOIN state_events USING (event_id) + LEFT JOIN event_auth USING (event_id) + WHERE event_id > ? + ORDER BY event_id + LIMIT ? + """ + + txn.execute(sql, (last_event_id, batch_size,)) + + return [(row[0], row[1], db_to_json(row[2]), row[3], row[4]) for row in txn] # type: ignore + + results = await self.db_pool.runInteraction( + desc="_rejected_events_metadata_get", func=get_rejected_events + ) + + if not results: + await self.db_pool.updates._end_background_update( + "rejected_events_metadata" + ) + return 0 + + state_events = [] + auth_events = [] + for event_id, room_version, event_json, has_state, has_event_auth in results: + last_event_id = event_id + + if has_state and has_event_auth: + continue + + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + event = make_event_from_dict(event_json, room_version_obj) + + if not event.is_state(): + continue + + if not has_state: + state_events.append( + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) + + if not has_event_auth: + for auth_id in event.auth_event_ids(): + auth_events.append( + { + "room_id": event.room_id, + "event_id": event.event_id, + "auth_id": auth_id, + } + ) + + if state_events: + await self.db_pool.simple_insert_many( + table="state_events", + values=state_events, + desc="_rejected_events_metadata_state_events", + ) + + if auth_events: + await self.db_pool.simple_insert_many( + table="event_auth", + values=auth_events, + desc="_rejected_events_metadata_event_auth", + ) + + await self.db_pool.updates._background_update_progress( + "rejected_events_metadata", {"last_event_id": last_event_id} + ) + + if len(results) < batch_size: + await self.db_pool.updates._end_background_update( + "rejected_events_metadata" + ) + + return len(results) diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql new file mode 100644 index 000000000000..41c1a4954757 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql @@ -0,0 +1,17 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5826, 'rejected_events_metadata', '{}'); From 1e6d85567cae9c5758bff5e98b80a1c7e5b6b1ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Jan 2021 17:18:04 +0000 Subject: [PATCH 3/8] Newsfile --- changelog.d/9016.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9016.misc diff --git a/changelog.d/9016.misc b/changelog.d/9016.misc new file mode 100644 index 000000000000..0d455b17dba7 --- /dev/null +++ b/changelog.d/9016.misc @@ -0,0 +1 @@ +Ensure rejected events get added to some metadata tables. From fbbdf38e13ccb75f6a8d54176394b589e84d46af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Jan 2021 13:03:34 +0000 Subject: [PATCH 4/8] Fix bg update ordering value --- .../main/schema/delta/58/28rejected_events_metadata.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql index 41c1a4954757..9c95646281bd 100644 --- a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql +++ b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql @@ -14,4 +14,4 @@ */ INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (5826, 'rejected_events_metadata', '{}'); + (5828, 'rejected_events_metadata', '{}'); From 08ce4eb507a03b8ac68a12b21f0b5c1b155e20c3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Jan 2021 14:09:50 +0000 Subject: [PATCH 5/8] Handle unknown room versions --- synapse/storage/databases/main/events_bg_updates.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 92d1ed906568..fce64acf4402 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -646,7 +646,11 @@ def get_rejected_events( if has_state and has_event_auth: continue - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version) + if not room_version_obj: + # We no longer suppport this room version, so we just ignore the + # events entirely. + continue event = make_event_from_dict(event_json, room_version_obj) From d93259a45e3d33bf40706fa5a6321c63294c91a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Jan 2021 14:19:04 +0000 Subject: [PATCH 6/8] Log we're ignoring event --- synapse/storage/databases/main/events_bg_updates.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index fce64acf4402..118e7bd07496 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -650,6 +650,11 @@ def get_rejected_events( if not room_version_obj: # We no longer suppport this room version, so we just ignore the # events entirely. + logger.info( + "Ignoring event with unknown room version %r: %r", + room_version, + event_id, + ) continue event = make_event_from_dict(event_json, room_version_obj) From bb6165f84e3ce7ab58d560df6f79db8618a50d36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Jan 2021 14:19:41 +0000 Subject: [PATCH 7/8] Update synapse/storage/databases/main/events_bg_updates.py Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/events_bg_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 118e7bd07496..7e4b175d0861 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -648,7 +648,7 @@ def get_rejected_events( room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version) if not room_version_obj: - # We no longer suppport this room version, so we just ignore the + # We no longer support this room version, so we just ignore the # events entirely. logger.info( "Ignoring event with unknown room version %r: %r", From 65a62506b5267f359b9738e463ed688a76c53320 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Jan 2021 13:33:17 +0000 Subject: [PATCH 8/8] Update docstring --- synapse/storage/databases/main/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 9e81d8779271..5e7753e09b4a 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -799,7 +799,8 @@ def _update_outliers_txn(self, txn, events_and_contexts): return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _store_event_txn(self, txn, events_and_contexts): - """Insert new events into the event and event_json tables + """Insert new events into the event, event_json, redaction and + state_events tables. Args: txn (twisted.enterprise.adbapi.Connection): db connection