From c2c364f27f61bece85dc7fd17cdedc4b60b9f7af Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 12 Jul 2021 17:22:54 +0100 Subject: [PATCH] Replace `room_depth.min_depth` with a BIGINT (#10289) while I'm dealing with INTEGERs and BIGINTs, let's replace room_depth.min_depth with a BIGINT. --- changelog.d/10289.misc | 1 + synapse/storage/databases/main/room.py | 104 ++++++++++++++++-- .../61/02drop_redundant_room_depth_index.sql | 18 +++ .../main/delta/61/03recreate_min_depth.py | 70 ++++++++++++ 4 files changed, 186 insertions(+), 7 deletions(-) create mode 100644 changelog.d/10289.misc create mode 100644 synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql create mode 100644 synapse/storage/schema/main/delta/61/03recreate_min_depth.py diff --git a/changelog.d/10289.misc b/changelog.d/10289.misc new file mode 100644 index 000000000000..2df30e7a7a59 --- /dev/null +++ b/changelog.d/10289.misc @@ -0,0 +1 @@ +Convert `room_depth.min_depth` column to a `BIGINT`. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 9f0d64a32542..6ddafe543412 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -25,6 +25,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchStore +from synapse.storage.types import Cursor from synapse.types import JsonDict, ThirdPartyInstanceID from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -1022,10 +1023,22 @@ def get_rooms_for_retention_period_in_range_txn(txn): ) -class RoomBackgroundUpdateStore(SQLBaseStore): +class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column" + POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2" + REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth" + + +_REPLACE_ROOM_DEPTH_SQL_COMMANDS = ( + "DROP TRIGGER populate_min_depth2_trigger ON room_depth", + "DROP FUNCTION populate_min_depth2()", + "ALTER TABLE room_depth DROP COLUMN min_depth", + "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth", +) + +class RoomBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) @@ -1037,15 +1050,25 @@ def __init__(self, database: DatabasePool, db_conn, hs): ) self.db_pool.updates.register_background_update_handler( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, self._remove_tombstoned_rooms_from_directory, ) self.db_pool.updates.register_background_update_handler( - self.ADD_ROOMS_ROOM_VERSION_COLUMN, + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN, self._background_add_rooms_room_version_column, ) + # BG updates to change the type of room_depth.min_depth + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, + self._background_populate_room_depth_min_depth2, + ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH, + self._background_replace_room_depth_min_depth, + ) + async def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of them into the room_retention table. @@ -1164,7 +1187,9 @@ def _background_add_rooms_room_version_column_txn(txn: LoggingTransaction): new_last_room_id = room_id self.db_pool.updates._background_update_progress_txn( - txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id} + txn, + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN, + {"room_id": new_last_room_id}, ) return False @@ -1176,7 +1201,7 @@ def _background_add_rooms_room_version_column_txn(txn: LoggingTransaction): if end: await self.db_pool.updates._end_background_update( - self.ADD_ROOMS_ROOM_VERSION_COLUMN + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN ) return batch_size @@ -1215,7 +1240,7 @@ def _get_rooms(txn): if not rooms: await self.db_pool.updates._end_background_update( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE ) return 0 @@ -1224,7 +1249,7 @@ def _get_rooms(txn): await self.set_room_is_public(room_id, False) await self.db_pool.updates._background_update_progress( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} ) return len(rooms) @@ -1268,6 +1293,71 @@ async def has_auth_chain_index(self, room_id: str) -> bool: return max_ordering is None + async def _background_populate_room_depth_min_depth2( + self, progress: JsonDict, batch_size: int + ) -> int: + """Populate room_depth.min_depth2 + + This is to deal with the fact that min_depth was initially created as a + 32-bit integer field. + """ + + def process(txn: Cursor) -> int: + last_room = progress.get("last_room", "") + txn.execute( + """ + UPDATE room_depth SET min_depth2=min_depth + WHERE room_id IN ( + SELECT room_id FROM room_depth WHERE room_id > ? + ORDER BY room_id LIMIT ? + ) + RETURNING room_id; + """, + (last_room, batch_size), + ) + row_count = txn.rowcount + if row_count == 0: + return 0 + last_room = max(row[0] for row in txn) + logger.info("populated room_depth up to %s", last_room) + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, + {"last_room": last_room}, + ) + return row_count + + result = await self.db_pool.runInteraction( + "_background_populate_min_depth2", process + ) + + if result != 0: + return result + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2 + ) + return 0 + + async def _background_replace_room_depth_min_depth( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop the old 'min_depth' column and rename 'min_depth2' into its place.""" + + def process(txn: Cursor) -> None: + for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS: + logger.info("completing room_depth migration: %s", sql) + txn.execute(sql) + + await self.db_pool.runInteraction("_background_replace_room_depth", process) + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH, + ) + + return 0 + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: DatabasePool, db_conn, hs): diff --git a/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql new file mode 100644 index 000000000000..35ca7a40c026 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- this index is redundant; there is another UNIQUE index on this table. +DROP INDEX IF EXISTS room_depth_room; + diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py new file mode 100644 index 000000000000..f8d7db9f2ef3 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py @@ -0,0 +1,70 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This migration handles the process of changing the type of `room_depth.min_depth` to +a BIGINT. +""" +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.types import Cursor + + +def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + # this only applies to postgres - sqlite does not distinguish between big and + # little ints. + return + + # First add a new column to contain the bigger min_depth + cur.execute("ALTER TABLE room_depth ADD COLUMN min_depth2 BIGINT") + + # Create a trigger which will keep it populated. + cur.execute( + """ + CREATE OR REPLACE FUNCTION populate_min_depth2() RETURNS trigger AS $BODY$ + BEGIN + new.min_depth2 := new.min_depth; + RETURN NEW; + END; + $BODY$ LANGUAGE plpgsql + """ + ) + + cur.execute( + """ + CREATE TRIGGER populate_min_depth2_trigger BEFORE INSERT OR UPDATE ON room_depth + FOR EACH ROW + EXECUTE PROCEDURE populate_min_depth2() + """ + ) + + # Start a bg process to populate it for old rooms + cur.execute( + """ + INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6103, 'populate_room_depth_min_depth2', '{}') + """ + ) + + # and another to switch them over once it completes. + cur.execute( + """ + INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2') + """ + ) + + +def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + pass