From 9e15a7ff5459b7522bdaf51d3e7bcd27aea570f2 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 4 Feb 2023 14:43:11 +0100 Subject: [PATCH 01/13] Add `event_stream_ordering` column to membership state tables Specifically this adds the column to `current_state_events`, `local_current_membership` and `room_memberships`. Each of these tables is regularly joined with the `events` table to get the stream ordering and denormalising this into each table will yield significant query performance improvements once used. --- synapse/storage/databases/main/events.py | 23 ++++++++++++++----- ...embership_tables_event_stream_ordering.sql | 18 +++++++++++++++ 2 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 7996cbb557d0..415924c5ed86 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1127,11 +1127,15 @@ def _update_current_state_txn( # been inserted into room_memberships. txn.execute_batch( """INSERT INTO current_state_events - (room_id, type, state_key, event_id, membership) - VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) + (room_id, type, state_key, event_id, membership, event_stream_ordering) + VALUES ( + ?, ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?) + ) """, [ - (room_id, key[0], key[1], ev_id, ev_id) + (room_id, key[0], key[1], ev_id, ev_id, ev_id) for key, ev_id in to_insert.items() ], ) @@ -1158,11 +1162,15 @@ def _update_current_state_txn( if to_insert: txn.execute_batch( """INSERT INTO local_current_membership - (room_id, user_id, event_id, membership) - VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) + (room_id, user_id, event_id, membership, event_stream_ordering) + VALUES ( + ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?) + ) """, [ - (room_id, key[1], ev_id, ev_id) + (room_id, key[1], ev_id, ev_id, ev_id) for key, ev_id in to_insert.items() if key[0] == EventTypes.Member and self.is_mine_id(key[1]) ], @@ -1770,6 +1778,7 @@ def _store_room_members_txn( table="room_memberships", keys=( "event_id", + "event_stream_ordering", "user_id", "sender", "room_id", @@ -1780,6 +1789,7 @@ def _store_room_members_txn( values=[ ( event.event_id, + event.internal_metadata.stream_ordering, event.state_key, event.user_id, event.room_id, @@ -1812,6 +1822,7 @@ def _store_room_members_txn( keyvalues={"room_id": event.room_id, "user_id": event.state_key}, values={ "event_id": event.event_id, + "event_stream_ordering": event.internal_metadata.stream_ordering, "membership": event.membership, }, ) diff --git a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql new file mode 100644 index 000000000000..2de56447ad76 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql @@ -0,0 +1,18 @@ +/* Copyright 2022 Beeper + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; +ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; +ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; From da71761d47c00879b562635f0e2a5e48d3d30bbf Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 4 Feb 2023 14:50:35 +0100 Subject: [PATCH 02/13] Add background job to populate `event_stream_ordering` on membership tables --- .../databases/main/events_bg_updates.py | 104 +++++++++++++++++- ...embership_tables_event_stream_ordering.sql | 3 + 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 584536111daa..9c194a79a6cc 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -17,7 +17,7 @@ import attr -from synapse.api.constants import EventContentFields, RelationTypes +from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -71,6 +71,10 @@ class _BackgroundUpdates: EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" + POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = ( + "populate_membership_event_stream_ordering" + ) + @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -99,6 +103,10 @@ def __init__( ): super().__init__(database, db_conn, hs) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, + self._populate_membership_event_stream_ordering, + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts, @@ -1498,3 +1506,97 @@ def _populate_txn(txn: LoggingTransaction) -> bool: ) return batch_size + + async def _populate_membership_event_stream_ordering( + self, progress: JsonDict, batch_size: int + ) -> int: + def _populate_membership_event_stream_ordering( + txn: LoggingTransaction, + ) -> bool: + + if "max_stream_ordering" in progress: + max_stream_ordering = progress["max_stream_ordering"] + else: + txn.execute("SELECT max(stream_ordering) FROM events") + res = txn.fetchone() + if res is None or res[0] is None: + return True + else: + max_stream_ordering = res[0] + + start = progress.get("stream_ordering", 0) + stop = start + batch_size + + sql = f""" + SELECT room_id, event_id, stream_ordering + FROM events + WHERE + type = '{EventTypes.Member}' + AND stream_ordering >= ? + AND stream_ordering < ? + """ + txn.execute(sql, (start, stop)) + + rows: List[Tuple[str, str, int]] = cast( + List[Tuple[str, str, int]], txn.fetchall() + ) + + event_ids: List[Tuple[str]] = [] + event_stream_orderings: List[Tuple[int]] = [] + + for _, event_id, event_stream_ordering in rows: + event_ids.append((event_id,)) + event_stream_orderings.append((event_stream_ordering,)) + + self.db_pool.simple_update_many_txn( + txn, + table="current_state_events", + key_names=("event_id",), + key_values=event_ids, + value_names=("event_stream_ordering",), + value_values=event_stream_orderings, + ) + + self.db_pool.simple_update_many_txn( + txn, + table="room_memberships", + key_names=("event_id",), + key_values=event_ids, + value_names=("event_stream_ordering",), + value_values=event_stream_orderings, + ) + + # NOTE: local_current_membership has no index on event_id, so only + # the room ID here will reduce the query rows read. + for room_id, event_id, event_stream_ordering in rows: + txn.execute( + """ + UPDATE local_current_membership + SET event_stream_ordering = ? + WHERE room_id = ? AND event_id = ? + """, + (event_stream_ordering, room_id, event_id), + ) + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, + { + "stream_ordering": stop, + "max_stream_ordering": max_stream_ordering, + }, + ) + + return stop > max_stream_ordering + + finished = await self.db_pool.runInteraction( + "_populate_membership_event_stream_ordering", + _populate_membership_event_stream_ordering, + ) + + if finished: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING + ) + + return batch_size diff --git a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql index 2de56447ad76..d313a1f311a4 100644 --- a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql @@ -16,3 +16,6 @@ ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7401, 'populate_membership_event_stream_ordering', '{}'); From eec814d96c82d7c7f86496aa1a2aa5c930a695de Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 21 Feb 2023 20:39:37 +0000 Subject: [PATCH 03/13] Changelog file --- changelog.d/15128.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15128.misc diff --git a/changelog.d/15128.misc b/changelog.d/15128.misc new file mode 100644 index 000000000000..c09911e48d2f --- /dev/null +++ b/changelog.d/15128.misc @@ -0,0 +1 @@ +Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar). From 6901682dd1664839782c9a12c060bf0f78e25b5f Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 27 Feb 2023 14:14:18 +0000 Subject: [PATCH 04/13] Bump `SCHEMA_COMPAT_VERSION` --- synapse/storage/schema/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index d3103a6c7a05..881fd15e1051 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -95,9 +95,9 @@ SCHEMA_COMPAT_VERSION = ( - # The threads_id column must exist for event_push_actions, event_push_summary, - # receipts_linearized, and receipts_graph. - 73 + # Queries against `event_stream_ordering` columns in membership tables must + # be disambiguated. + 74 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat From bc1a1a9ffcc348f35b81b1056d47cedff221b27b Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 27 Feb 2023 15:32:00 +0000 Subject: [PATCH 05/13] Fix formatting Co-authored-by: David Robertson --- synapse/storage/databases/main/events_bg_updates.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 9c194a79a6cc..084562b199d8 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1513,7 +1513,6 @@ async def _populate_membership_event_stream_ordering( def _populate_membership_event_stream_ordering( txn: LoggingTransaction, ) -> bool: - if "max_stream_ordering" in progress: max_stream_ordering = progress["max_stream_ordering"] else: From 5af045370368b9b34b4d5e830d5ee788bd5a6cdb Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 16 Mar 2023 18:44:28 +0000 Subject: [PATCH 06/13] Use cast for clearer typing Co-authored-by: David Robertson --- synapse/storage/databases/main/events_bg_updates.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 084562b199d8..1c487f50a925 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1536,9 +1536,7 @@ def _populate_membership_event_stream_ordering( """ txn.execute(sql, (start, stop)) - rows: List[Tuple[str, str, int]] = cast( - List[Tuple[str, str, int]], txn.fetchall() - ) + rows = cast(List[Tuple[str, str, int]], txn.fetchall()) event_ids: List[Tuple[str]] = [] event_stream_orderings: List[Tuple[int]] = [] From 845c9b90ff669a2a7394cbeb93d4b0f338a78af1 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Thu, 16 Mar 2023 18:47:01 +0000 Subject: [PATCH 07/13] Revert "Add background job to populate `event_stream_ordering` on membership tables" This reverts commit da71761d47c00879b562635f0e2a5e48d3d30bbf. --- .../databases/main/events_bg_updates.py | 101 +----------------- ...embership_tables_event_stream_ordering.sql | 3 - 2 files changed, 1 insertion(+), 103 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 1c487f50a925..584536111daa 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -17,7 +17,7 @@ import attr -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.constants import EventContentFields, RelationTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -71,10 +71,6 @@ class _BackgroundUpdates: EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" - POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = ( - "populate_membership_event_stream_ordering" - ) - @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -103,10 +99,6 @@ def __init__( ): super().__init__(database, db_conn, hs) - self.db_pool.updates.register_background_update_handler( - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, - self._populate_membership_event_stream_ordering, - ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts, @@ -1506,94 +1498,3 @@ def _populate_txn(txn: LoggingTransaction) -> bool: ) return batch_size - - async def _populate_membership_event_stream_ordering( - self, progress: JsonDict, batch_size: int - ) -> int: - def _populate_membership_event_stream_ordering( - txn: LoggingTransaction, - ) -> bool: - if "max_stream_ordering" in progress: - max_stream_ordering = progress["max_stream_ordering"] - else: - txn.execute("SELECT max(stream_ordering) FROM events") - res = txn.fetchone() - if res is None or res[0] is None: - return True - else: - max_stream_ordering = res[0] - - start = progress.get("stream_ordering", 0) - stop = start + batch_size - - sql = f""" - SELECT room_id, event_id, stream_ordering - FROM events - WHERE - type = '{EventTypes.Member}' - AND stream_ordering >= ? - AND stream_ordering < ? - """ - txn.execute(sql, (start, stop)) - - rows = cast(List[Tuple[str, str, int]], txn.fetchall()) - - event_ids: List[Tuple[str]] = [] - event_stream_orderings: List[Tuple[int]] = [] - - for _, event_id, event_stream_ordering in rows: - event_ids.append((event_id,)) - event_stream_orderings.append((event_stream_ordering,)) - - self.db_pool.simple_update_many_txn( - txn, - table="current_state_events", - key_names=("event_id",), - key_values=event_ids, - value_names=("event_stream_ordering",), - value_values=event_stream_orderings, - ) - - self.db_pool.simple_update_many_txn( - txn, - table="room_memberships", - key_names=("event_id",), - key_values=event_ids, - value_names=("event_stream_ordering",), - value_values=event_stream_orderings, - ) - - # NOTE: local_current_membership has no index on event_id, so only - # the room ID here will reduce the query rows read. - for room_id, event_id, event_stream_ordering in rows: - txn.execute( - """ - UPDATE local_current_membership - SET event_stream_ordering = ? - WHERE room_id = ? AND event_id = ? - """, - (event_stream_ordering, room_id, event_id), - ) - - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, - { - "stream_ordering": stop, - "max_stream_ordering": max_stream_ordering, - }, - ) - - return stop > max_stream_ordering - - finished = await self.db_pool.runInteraction( - "_populate_membership_event_stream_ordering", - _populate_membership_event_stream_ordering, - ) - - if finished: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING - ) - - return batch_size diff --git a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql index d313a1f311a4..2de56447ad76 100644 --- a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql @@ -16,6 +16,3 @@ ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; - -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7401, 'populate_membership_event_stream_ordering', '{}'); From 8979abc928fd0410af0f03b3f95e818846eac432 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Thu, 16 Mar 2023 18:51:47 +0000 Subject: [PATCH 08/13] Make denormalised `event_stream_ordering` columns foreign keys --- .../delta/74/01membership_tables_event_stream_ordering.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql index 2de56447ad76..bb19ed27b76b 100644 --- a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql @@ -13,6 +13,6 @@ * limitations under the License. */ -ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; +ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); +ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); +ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); From bc3845c945f697f06e951577a89971f11bd505ec Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Thu, 16 Mar 2023 18:55:16 +0000 Subject: [PATCH 09/13] Add comment in schema file explaining new denormalised columns --- .../main/delta/74/01membership_tables_event_stream_ordering.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql index bb19ed27b76b..e2608f3a2e83 100644 --- a/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql @@ -13,6 +13,8 @@ * limitations under the License. */ +-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which +-- we use to improve database performance by reduring JOINs. ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); From 36533a992ac4f636703ee0617b27481b244f9533 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Thu, 16 Mar 2023 19:10:39 +0000 Subject: [PATCH 10/13] Add triggers to enforce consistency of `event_stream_ordering` columns --- ...p_tables_event_stream_ordering_triggers.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 synapse/storage/schema/main/delta/74/02membership_tables_event_stream_ordering_triggers.py diff --git a/synapse/storage/schema/main/delta/74/02membership_tables_event_stream_ordering_triggers.py b/synapse/storage/schema/main/delta/74/02membership_tables_event_stream_ordering_triggers.py new file mode 100644 index 000000000000..e32e9083b359 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/02membership_tables_event_stream_ordering_triggers.py @@ -0,0 +1,79 @@ +# Copyright 2022 Beeper +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +This migration adds triggers to the room membership tables to enforce consistency. +Triggers cannot be expressed in .sql files, so we have to use a separate file. +""" +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.types import Cursor + + +def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + # Complain if the `event_stream_ordering` in membership tables doesn't match + # the `stream_ordering` row with the same `event_id` in `events`. + if isinstance(database_engine, Sqlite3Engine): + for table in ( + "current_state_events", + "local_current_membership", + "room_memberships", + ): + cur.execute( + f""" + CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering + BEFORE INSERT ON {table} + FOR EACH ROW + BEGIN + SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}') + WHERE EXISTS ( + SELECT 1 FROM events + WHERE events.event_id = NEW.event_id + AND events.stream_ordering != NEW.event_stream_ordering + ); + END; + """ + ) + elif isinstance(database_engine, PostgresEngine): + cur.execute( + """ + CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$ + BEGIN + IF EXISTS ( + SELECT 1 FROM events + WHERE events.event_id = NEW.event_id + AND events.stream_ordering != NEW.event_stream_ordering + ) THEN + RAISE EXCEPTION 'Incorrect event_stream_ordering'; + END IF; + RETURN NEW; + END; + $BODY$ LANGUAGE plpgsql; + """ + ) + + for table in ( + "current_state_events", + "local_current_membership", + "room_memberships", + ): + cur.execute( + f""" + CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table} + FOR EACH ROW + EXECUTE PROCEDURE check_event_stream_ordering() + """ + ) + else: + raise NotImplementedError("Unknown database engine") From b3e9bbc5b96349cbbf77eaafe574e7c7d09ab561 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Thu, 16 Mar 2023 19:31:26 +0000 Subject: [PATCH 11/13] Re-order purge room tables to account for foreign keys --- synapse/storage/databases/main/purge_events.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 9c41d01e1317..d20458a95e6f 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -425,14 +425,16 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: "partial_state_events", "partial_state_rooms_servers", "partial_state_rooms", + # Note: the _membership(s) tables have foreign keys to the `events` table + # so must be deleted first. + "local_current_membership", + "room_memberships", "events", "federation_inbound_events_staging", - "local_current_membership", "receipts_graph", "receipts_linearized", "room_aliases", "room_depth", - "room_memberships", "room_stats_state", "room_stats_current", "room_stats_earliest_token", From 27b469fb2ca1f2912c35783d50a2ef41af4a08e0 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 22 Mar 2023 14:25:00 +0000 Subject: [PATCH 12/13] Bump schema version to 75 --- synapse/storage/schema/__init__.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 881fd15e1051..7ce73284e769 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 74 # remember to update the list below when updating +SCHEMA_VERSION = 75 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -91,6 +91,12 @@ - A query on `event_stream_ordering` column has now been disambiguated (i.e. the codebase can handle the `current_state_events`, `local_current_memberships` and `room_memberships` tables having an `event_stream_ordering` column). + +Changes in SCHEMA_VERSION = 75: + - The `event_stream_ordering` column in membership tables (`current_state_events`, + `local_current_membership` & `room_memberships`) is now being populated for new + rows. When this the background job to populate historical rows lands this will + become the compat schema version. """ From f55d1630b1c7178c5ce404b85069f9206d6ed1ee Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 22 Mar 2023 15:31:59 +0000 Subject: [PATCH 13/13] Correct typo Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/schema/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 7ce73284e769..a28f2b997cef 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -95,7 +95,7 @@ Changes in SCHEMA_VERSION = 75: - The `event_stream_ordering` column in membership tables (`current_state_events`, `local_current_membership` & `room_memberships`) is now being populated for new - rows. When this the background job to populate historical rows lands this will + rows. When the background job to populate historical rows lands this will become the compat schema version. """