Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reintroduce membership tables event stream ordering #15128

Merged
1 change: 1 addition & 0 deletions changelog.d/15128.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).
23 changes: 17 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,11 +1127,15 @@ def _update_current_state_txn(
# been inserted into room_memberships.
txn.execute_batch(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
(room_id, type, state_key, event_id, membership, event_stream_ordering)
VALUES (
?, ?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
richvdh marked this conversation as resolved.
Show resolved Hide resolved
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
(room_id, key[0], key[1], ev_id, ev_id, ev_id)
for key, ev_id in to_insert.items()
],
)
Expand All @@ -1158,11 +1162,15 @@ def _update_current_state_txn(
if to_insert:
txn.execute_batch(
"""INSERT INTO local_current_membership
(room_id, user_id, event_id, membership)
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
(room_id, user_id, event_id, membership, event_stream_ordering)
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""",
[
(room_id, key[1], ev_id, ev_id)
(room_id, key[1], ev_id, ev_id, ev_id)
for key, ev_id in to_insert.items()
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
],
Expand Down Expand Up @@ -1770,6 +1778,7 @@ def _store_room_members_txn(
table="room_memberships",
keys=(
"event_id",
"event_stream_ordering",
"user_id",
"sender",
"room_id",
Expand All @@ -1780,6 +1789,7 @@ def _store_room_members_txn(
values=[
(
event.event_id,
event.internal_metadata.stream_ordering,
event.state_key,
event.user_id,
event.room_id,
Expand Down Expand Up @@ -1812,6 +1822,7 @@ def _store_room_members_txn(
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
values={
"event_id": event.event_id,
"event_stream_ordering": event.internal_metadata.stream_ordering,
"membership": event.membership,
},
)
Expand Down
6 changes: 4 additions & 2 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,14 +425,16 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
"partial_state_events",
"partial_state_rooms_servers",
"partial_state_rooms",
# Note: the _membership(s) tables have foreign keys to the `events` table
# so must be deleted first.
"local_current_membership",
"room_memberships",
"events",
"federation_inbound_events_staging",
"local_current_membership",
"receipts_graph",
"receipts_linearized",
"room_aliases",
"room_depth",
"room_memberships",
"room_stats_state",
"room_stats_current",
"room_stats_earliest_token",
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@


SCHEMA_COMPAT_VERSION = (
# The threads_id column must exist for event_push_actions, event_push_summary,
# receipts_linearized, and receipts_graph.
73
# Queries against `event_stream_ordering` columns in membership tables must
# be disambiguated.
74
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
-- we use to improve database performance by reduring JOINs.
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022 Beeper
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
This migration adds triggers to the room membership tables to enforce consistency.
Triggers cannot be expressed in .sql files, so we have to use a separate file.
"""
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Cursor


def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
# Complain if the `event_stream_ordering` in membership tables doesn't match
# the `stream_ordering` row with the same `event_id` in `events`.
if isinstance(database_engine, Sqlite3Engine):
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
cur.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
BEFORE INSERT ON {table}
FOR EACH ROW
BEGIN
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
WHERE EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
);
END;
"""
)
elif isinstance(database_engine, PostgresEngine):
cur.execute(
"""
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
BEGIN
IF EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
) THEN
RAISE EXCEPTION 'Incorrect event_stream_ordering';
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE plpgsql;
"""
)

for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
cur.execute(
f"""
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW
EXECUTE PROCEDURE check_event_stream_ordering()
"""
)
else:
raise NotImplementedError("Unknown database engine")