From 22078fb47bb3f687531c99b05dabaef0795ba3f0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 25 Jan 2023 13:15:03 -0500 Subject: [PATCH 1/7] Attempt to delete more duplicate rows in receipts_linearized table. --- synapse/storage/databases/main/receipts.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3468f354e60f..336b5f896aad 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -941,24 +941,29 @@ async def _background_receipts_linearized_unique_index( receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: + if isinstance(self.database_engine, PostgresEngine): + ROW_ID_NAME = "ctid" + else: + ROW_ID_NAME = "rowid" + # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # We expect the following query to use the per-thread receipt index and take # less than a minute. sql = """ - SELECT MAX(stream_id), room_id, receipt_type, user_id + SELECT MAX(stream_id), ?, room_id, receipt_type, user_id FROM receipts_linearized WHERE thread_id IS NULL GROUP BY room_id, receipt_type, user_id HAVING COUNT(*) > 1 """ - txn.execute(sql) + txn.execute(sql, (ROW_ID_NAME,)) duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn)) # Then remove duplicate receipts, keeping the one with the highest - # `stream_id`. There should only be a single receipt with any given - # `stream_id`. - for max_stream_id, room_id, receipt_type, user_id in duplicate_keys: + # `stream_id`. Since there might be duplicate rows with the same + # `stream_id`, we delete by the rowid instead. + for _, row_id, room_id, receipt_type, user_id in duplicate_keys: sql = """ DELETE FROM receipts_linearized WHERE @@ -966,9 +971,9 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: receipt_type = ? AND user_id = ? AND thread_id IS NULL AND - stream_id < ? + ? != ? """ - txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id)) + txn.execute(sql, (room_id, receipt_type, user_id, ROW_ID_NAME, row_id)) await self.db_pool.runInteraction( self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME, From cadbf14afa46b24a19cfce751866a46a42bc01d7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 25 Jan 2023 13:17:02 -0500 Subject: [PATCH 2/7] Newsfragment --- changelog.d/14915.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14915.bugfix diff --git a/changelog.d/14915.bugfix b/changelog.d/14915.bugfix new file mode 100644 index 000000000000..4969e5450c3f --- /dev/null +++ b/changelog.d/14915.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.70.0 where the background updates to add non-thread unique indexes on receipts could fail when upgrading from 1.67.0 or earlier. From 17cadfba2de92ab8a4358319475fb16fa35add1b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 25 Jan 2023 13:32:17 -0500 Subject: [PATCH 3/7] Lint --- synapse/storage/databases/main/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 336b5f896aad..ccc220185966 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -958,7 +958,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: HAVING COUNT(*) > 1 """ txn.execute(sql, (ROW_ID_NAME,)) - duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn)) + duplicate_keys = cast(List[Tuple[int, int, str, str, str]], list(txn)) # Then remove duplicate receipts, keeping the one with the highest # `stream_id`. Since there might be duplicate rows with the same From 7b217a38d043de8625c5704b6d8bda18d4a34808 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 25 Jan 2023 15:08:14 -0500 Subject: [PATCH 4/7] Fix syntax. --- synapse/storage/databases/main/receipts.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index ccc220185966..8b22dc241f0a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -950,30 +950,30 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: # https://github.com/matrix-org/synapse/issues/14406. # We expect the following query to use the per-thread receipt index and take # less than a minute. - sql = """ - SELECT MAX(stream_id), ?, room_id, receipt_type, user_id + sql = f""" + SELECT MAX(stream_id), {ROW_ID_NAME}, room_id, receipt_type, user_id FROM receipts_linearized WHERE thread_id IS NULL GROUP BY room_id, receipt_type, user_id HAVING COUNT(*) > 1 """ - txn.execute(sql, (ROW_ID_NAME,)) + txn.execute(sql) duplicate_keys = cast(List[Tuple[int, int, str, str, str]], list(txn)) # Then remove duplicate receipts, keeping the one with the highest # `stream_id`. Since there might be duplicate rows with the same # `stream_id`, we delete by the rowid instead. for _, row_id, room_id, receipt_type, user_id in duplicate_keys: - sql = """ + sql = f""" DELETE FROM receipts_linearized WHERE room_id = ? AND receipt_type = ? AND user_id = ? AND thread_id IS NULL AND - ? != ? + {ROW_ID_NAME} != ? """ - txn.execute(sql, (room_id, receipt_type, user_id, ROW_ID_NAME, row_id)) + txn.execute(sql, (room_id, receipt_type, user_id, row_id)) await self.db_pool.runInteraction( self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME, From 20d3d328221163eaa30191e1f5b83d1d55e33c6f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 26 Jan 2023 08:00:01 -0500 Subject: [PATCH 5/7] Fix postgres support. --- synapse/storage/databases/main/receipts.py | 2 +- tests/storage/databases/main/test_receipts.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 8b22dc241f0a..a9b7c346dafc 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -951,7 +951,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: # We expect the following query to use the per-thread receipt index and take # less than a minute. sql = f""" - SELECT MAX(stream_id), {ROW_ID_NAME}, room_id, receipt_type, user_id + SELECT MAX(stream_id), MAX({ROW_ID_NAME}), room_id, receipt_type, user_id FROM receipts_linearized WHERE thread_id IS NULL GROUP BY room_id, receipt_type, user_id diff --git a/tests/storage/databases/main/test_receipts.py b/tests/storage/databases/main/test_receipts.py index 68026e283046..ac77aec003b1 100644 --- a/tests/storage/databases/main/test_receipts.py +++ b/tests/storage/databases/main/test_receipts.py @@ -168,7 +168,9 @@ def test_background_receipts_linearized_unique_index(self) -> None: {"stream_id": 6, "event_id": "$some_event"}, ], (self.other_room_id, "m.read", self.user_id): [ - {"stream_id": 7, "event_id": "$some_event"} + # It is possible for stream IDs to be duplicated. + {"stream_id": 7, "event_id": "$some_event"}, + {"stream_id": 7, "event_id": "$some_event"}, ], }, expected_unique_receipts={ From 11311f7c76f5d7b8fafaf8a8d183325285df0be6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 27 Jan 2023 10:20:57 -0500 Subject: [PATCH 6/7] Clarify comment. Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/storage/databases/main/receipts.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index a9b7c346dafc..e009c44710b3 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -948,8 +948,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. - # We expect the following query to use the per-thread receipt index and take - # less than a minute. + # The following query takes less than a minute on matrix.org. sql = f""" SELECT MAX(stream_id), MAX({ROW_ID_NAME}), room_id, receipt_type, user_id FROM receipts_linearized From 7faca568f8b7872431c61009e89cd6f959f0bcec Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 1 Feb 2023 09:47:12 -0500 Subject: [PATCH 7/7] Separate check for stream ID & ctid. --- synapse/storage/databases/main/receipts.py | 24 +++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e009c44710b3..29972d520413 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -949,20 +949,34 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. - sql = f""" - SELECT MAX(stream_id), MAX({ROW_ID_NAME}), room_id, receipt_type, user_id + sql = """ + SELECT MAX(stream_id), room_id, receipt_type, user_id FROM receipts_linearized WHERE thread_id IS NULL GROUP BY room_id, receipt_type, user_id HAVING COUNT(*) > 1 """ txn.execute(sql) - duplicate_keys = cast(List[Tuple[int, int, str, str, str]], list(txn)) + duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn)) # Then remove duplicate receipts, keeping the one with the highest # `stream_id`. Since there might be duplicate rows with the same - # `stream_id`, we delete by the rowid instead. - for _, row_id, room_id, receipt_type, user_id in duplicate_keys: + # `stream_id`, we delete by the ctid instead. + for stream_id, room_id, receipt_type, user_id in duplicate_keys: + sql = f""" + SELECT {ROW_ID_NAME} + FROM receipts_linearized + WHERE + room_id = ? AND + receipt_type = ? AND + user_id = ? AND + thread_id IS NULL AND + stream_id = ? + LIMIT 1 + """ + txn.execute(sql, (room_id, receipt_type, user_id, stream_id)) + row_id = cast(Tuple[str], txn.fetchone())[0] + sql = f""" DELETE FROM receipts_linearized WHERE