Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix background updates failing to add unique indexes on receipts #14453

Merged
merged 7 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14453.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where the background updates to add non-thread unique indexes on receipts could fail when upgrading from 1.67.0 or earlier.
115 changes: 91 additions & 24 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,6 @@ def __init__(
prefilled_cache=receipts_stream_prefill,
)

self.db_pool.updates.register_background_index_update(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move these to the non-worker store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved them to the *BackgroundUpdateStore. I thought that's where we usually put the background updates?
Is there a motivation for having these on the worker store that I've completely missed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no motivation besides it being consistent with other examples I saw. 🤷

"receipts_linearized_unique_index",
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

self.db_pool.updates.register_background_index_update(
"receipts_graph_unique_index",
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

def get_max_receipt_stream_id(self) -> int:
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
Expand Down Expand Up @@ -702,9 +684,6 @@ def _insert_linearized_receipt_txn(
"data": json_encoder.encode(data),
},
where_clause=where_clause,
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
Comment on lines -705 to -707
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: is the table correctly deemed safe to upsert into when the relevant background updates have run? (Wasn't sure how the second commit would affect this, if at all)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, once the unique index has been added by the background update, we will be able to rely on native upserts again (and the value of lock won't matter).

)

return rx_ts
Expand Down Expand Up @@ -862,14 +841,13 @@ def _insert_graph_receipt_txn(
"data": json_encoder.encode(data),
},
where_clause=where_clause,
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
)


class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME = "receipts_linearized_unique_index"
RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME = "receipts_graph_unique_index"

def __init__(
self,
Expand All @@ -883,6 +861,14 @@ def __init__(
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
self._populate_receipt_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_linearized_unique_index,
)
self.db_pool.updates.register_background_update_handler(
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_graph_unique_index,
)

async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int
Expand Down Expand Up @@ -938,6 +924,87 @@ def _populate_receipt_event_stream_ordering_txn(

return batch_size

async def _background_receipts_unique_index(
self, update_name: str, index_name: str, table: str
) -> int:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
receipts table, for non-thread receipts.
"""

def _receipts_unique_index_txn(txn: LoggingTransaction) -> None:
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# We expect the following query to use the per-thread receipt index and take
# less than a minute.
sql = f"""
SELECT room_id, receipt_type, user_id FROM {table}
WHERE thread_id IS NULL
GROUP BY room_id, receipt_type, user_id
HAVING COUNT(*) > 1
"""
txn.execute(sql)
duplicate_keys = cast(List[Tuple[str, str, str]], list(txn))

# Then remove all duplicate receipts.
# We could be clever and try to keep the latest receipt out of every set of
# duplicates, but it's far simpler to remove them all.
for room_id, receipt_type, user_id in duplicate_keys:
sql = f"""
DELETE FROM {table}
WHERE
room_id = ? AND
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL
"""
txn.execute(sql, (room_id, receipt_type, user_id))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth trying to preserve these read receipts?

For receipts_linearized, we can choose to keep the receipt with the highest stream_id.
For receipts_graph, we have to dig into data, which is a user-provided json field, and extract out the ts field. This is a lot more painful, as we have to handle invalid data and ties.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hope is that they're completely identical but with thread_id being NULL (where NULL != NULL)? I guess that's hard to check though.

Honestly the receipts_graph isn't read from right now so I think it is probably safe to just delete them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although deleting from receipts_linearized will have a user-visible impact, so I'm unsure if that's a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bit of rejigging to preserve the receipt with the highest stream_id in receipts_linearized. Users shouldn't notice anything now.


# Now that the duplicates are gone, we can create the index.
concurrently = (
"CONCURRENTLY"
if isinstance(self.database_engine, PostgresEngine)
else ""
)
sql = f"""
CREATE UNIQUE INDEX {concurrently} {index_name}
ON {table}(room_id, receipt_type, user_id)
WHERE thread_id IS NULL
"""
txn.execute(sql)

await self.db_pool.runInteraction(
update_name,
_receipts_unique_index_txn,
)

await self.db_pool.updates._end_background_update(update_name)

return 1

async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to
`receipts_linearized`, for non-thread receipts.
"""
return await self._background_receipts_unique_index(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
"receipts_linearized_unique_index",
"receipts_linearized",
)

async def _background_receipts_graph_unique_index(
self, progress: dict, batch_size: int
) -> int:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to
`receipts_graph`, for non-thread receipts.
"""
return await self._background_receipts_unique_index(
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
"receipts_graph_unique_index",
"receipts_graph",
)


class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass
169 changes: 169 additions & 0 deletions tests/storage/databases/main/test_receipts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict

from twisted.test.proto_helpers import MemoryReactor

from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.database import LoggingTransaction
from synapse.util import Clock

from tests.unittest import HomeserverTestCase


class ReceiptsBackgroundUpdateStoreTestCase(HomeserverTestCase):

servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.store = hs.get_datastores().main
self.user_id = self.register_user("foo", "pass")
self.token = self.login("foo", "pass")
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
self.other_room_id = self.helper.create_room_as(self.user_id, tok=self.token)

def _test_background_receipts_unique_index(
self,
update_name: str,
index_name: str,
table: str,
values: Dict[str, Any],
):
"""Test that the background update to uniqueify non-thread receipts in
the given receipts table works properly.
"""
# First, undo the background update.
def drop_receipts_unique_index(txn: LoggingTransaction) -> None:
txn.execute(f"DROP INDEX IF EXISTS {index_name}")

self.get_success(
self.store.db_pool.runInteraction(
"drop_receipts_unique_index",
drop_receipts_unique_index,
)
)

# Add duplicate receipts for `room_id`.
for _ in range(2):
self.get_success(
self.store.db_pool.simple_insert(
table,
{
"room_id": self.room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
"thread_id": None,
"data": "{}",
**values,
},
)
)

# Add a unique receipt for `other_room_id`.
self.get_success(
self.store.db_pool.simple_insert(
table,
{
"room_id": self.other_room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
"thread_id": None,
"data": "{}",
**values,
},
)
)

# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": update_name,
"progress_json": "{}",
},
)
)

self.store.db_pool.updates._all_done = False

self.wait_for_background_updates()

# Check that the background task deleted the duplicate receipts.
res = self.get_success(
self.store.db_pool.simple_select_onecol(
table=table,
keyvalues={
"room_id": self.room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
# `simple_select_onecol` does not support NULL filters,
# so skip the filter on `thread_id`.
},
retcol="room_id",
desc="get_receipt",
)
)
self.assertEqual(0, len(res))

# Check that the background task did not delete the unique receipts.
res = self.get_success(
self.store.db_pool.simple_select_onecol(
table=table,
keyvalues={
"room_id": self.other_room_id,
"receipt_type": "m.read",
"user_id": self.user_id,
# `simple_select_onecol` does not support NULL filters,
# so skip the filter on `thread_id`.
},
retcol="room_id",
desc="get_receipt",
)
)
self.assertEqual(1, len(res))

def test_background_receipts_linearized_unique_index(self):
"""Test that the background update to uniqueify non-thread receipts in
`receipts_linearized` works properly.
"""
self._test_background_receipts_unique_index(
"receipts_linearized_unique_index",
"receipts_linearized_unique_index",
"receipts_linearized",
{
"stream_id": 5,
"event_id": "$some_event",
},
)

def test_background_receipts_graph_unique_index(self):
"""Test that the background update to uniqueify non-thread receipts in
`receipts_graph` works properly.
"""
self._test_background_receipts_unique_index(
"receipts_graph_unique_index",
"receipts_graph_unique_index",
"receipts_graph",
{
"event_ids": '["$some_event"]',
},
)