Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix chain cover background update to work with split out event persisters #9115

Merged
merged 2 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9115.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve efficiency of large state resolutions.
11 changes: 11 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import Collection

# python 3 does not have a maximum int value
Expand Down Expand Up @@ -412,6 +413,16 @@ def __init__(
self._check_safe_to_upsert,
)

# We define this sequence here so that it can be referenced from both
# the DataStore and PersistEventStore.
Comment on lines +416 to +417
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels awfully dirty to put this here. Should we have a PersistEventWorkerStore that contains just this?

Copy link
Member Author

@erikjohnston erikjohnston Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is dirty. I don't think having a shared base class works as we the PersistEventStore isn't part of the mixin, and so you'd lead to two instances of the sequence (which is fine for postgres, but not fine for SQLite which tracks updates in memory)

def get_chain_id_txn(txn):
txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
return txn.fetchone()[0]

self.event_chain_id_gen = build_sequence_generator(
engine, get_chain_id_txn, "event_auth_chain_id"
)

def is_running(self) -> bool:
"""Is the database pool currently running
"""
Expand Down
36 changes: 13 additions & 23 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
Expand Down Expand Up @@ -100,14 +99,6 @@ def __init__(
self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()

def get_chain_id_txn(txn):
txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
return txn.fetchone()[0]

self._event_chain_id_gen = build_sequence_generator(
db.engine, get_chain_id_txn, "event_auth_chain_id"
)

self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id

Expand Down Expand Up @@ -479,12 +470,13 @@ def _persist_event_auth_chain_txn(
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}

self._add_chain_cover_index(
txn, event_to_room_id, event_to_types, event_to_auth_chain
txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain,
)

@staticmethod
def _add_chain_cover_index(
self,
txn,
db_pool: DatabasePool,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
Expand All @@ -507,7 +499,7 @@ def _add_chain_cover_index(
# We check if there are any events that need to be handled in the rooms
# we're looking at. These should just be out of band memberships, where
# we didn't have the auth chain when we first persisted.
rows = self.db_pool.simple_select_many_txn(
rows = db_pool.simple_select_many_txn(
txn,
table="event_auth_chain_to_calculate",
keyvalues={},
Expand All @@ -523,7 +515,7 @@ def _add_chain_cover_index(
# (We could pull out the auth events for all rows at once using
# simple_select_many, but this case happens rarely and almost always
# with a single row.)
auth_events = self.db_pool.simple_select_onecol_txn(
auth_events = db_pool.simple_select_onecol_txn(
txn, "event_auth", keyvalues={"event_id": event_id}, retcol="auth_id",
)

Expand Down Expand Up @@ -572,9 +564,7 @@ def _add_chain_cover_index(

events_to_calc_chain_id_for.add(auth_id)

event_to_auth_chain[
auth_id
] = self.db_pool.simple_select_onecol_txn(
event_to_auth_chain[auth_id] = db_pool.simple_select_onecol_txn(
txn,
"event_auth",
keyvalues={"event_id": auth_id},
Expand Down Expand Up @@ -606,7 +596,7 @@ def _add_chain_cover_index(
room_id = event_to_room_id.get(event_id)
if room_id:
e_type, state_key = event_to_types[event_id]
self.db_pool.simple_insert_txn(
db_pool.simple_insert_txn(
txn,
table="event_auth_chain_to_calculate",
values={
Expand Down Expand Up @@ -651,7 +641,7 @@ def _add_chain_cover_index(
proposed_new_id = existing_chain_id[0]
proposed_new_seq = existing_chain_id[1] + 1
if (proposed_new_id, proposed_new_seq) not in chains_tuples_allocated:
already_allocated = self.db_pool.simple_select_one_onecol_txn(
already_allocated = db_pool.simple_select_one_onecol_txn(
txn,
table="event_auth_chains",
keyvalues={
Expand All @@ -672,14 +662,14 @@ def _add_chain_cover_index(
)

if not new_chain_tuple:
new_chain_tuple = (self._event_chain_id_gen.get_next_id_txn(txn), 1)
new_chain_tuple = (db_pool.event_chain_id_gen.get_next_id_txn(txn), 1)

chains_tuples_allocated.add(new_chain_tuple)

chain_map[event_id] = new_chain_tuple
new_chain_tuples[event_id] = new_chain_tuple

self.db_pool.simple_insert_many_txn(
db_pool.simple_insert_many_txn(
txn,
table="event_auth_chains",
values=[
Expand All @@ -688,7 +678,7 @@ def _add_chain_cover_index(
],
)

self.db_pool.simple_delete_many_txn(
db_pool.simple_delete_many_txn(
txn,
table="event_auth_chain_to_calculate",
keyvalues={},
Expand Down Expand Up @@ -721,7 +711,7 @@ def _add_chain_cover_index(
# Step 1, fetch all existing links from all the chains we've seen
# referenced.
chain_links = _LinkMap()
rows = self.db_pool.simple_select_many_txn(
rows = db_pool.simple_select_many_txn(
txn,
table="event_auth_chain_links",
column="origin_chain_id",
Expand Down Expand Up @@ -785,7 +775,7 @@ def _add_chain_cover_index(
(chain_id, sequence_number), (target_id, target_seq)
)

self.db_pool.simple_insert_many_txn(
db_pool.simple_insert_many_txn(
txn,
table="event_auth_chain_links",
values=[
Expand Down
9 changes: 7 additions & 2 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
from synapse.storage.databases.main.events import PersistEventsStore
from synapse.storage.types import Cursor
from synapse.types import JsonDict

Expand Down Expand Up @@ -833,8 +834,12 @@ def _calculate_auth_chain(
#
# Annoyingly we need to gut wrench into the persit event store so that
# we can reuse the function to calculate the chain cover for rooms.
self.hs.get_datastores().persist_events._add_chain_cover_index(
txn, event_to_room_id, event_to_types, event_to_auth_chain,
PersistEventsStore._add_chain_cover_index(
txn,
self.db_pool,
event_to_room_id,
event_to_types,
event_to_auth_chain,
)

return new_last_depth, new_last_stream, count
Expand Down
10 changes: 6 additions & 4 deletions synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
import abc
import logging
import threading
from typing import Callable, List, Optional
from typing import TYPE_CHECKING, Callable, List, Optional

from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import (
BaseDatabaseEngine,
IncorrectDatabaseSetup,
PostgresEngine,
)
from synapse.storage.types import Connection, Cursor

if TYPE_CHECKING:
from synapse.storage.database import LoggingDatabaseConnection

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -55,7 +57,7 @@ def get_next_id_txn(self, txn: Cursor) -> int:
@abc.abstractmethod
def check_consistency(
self,
db_conn: LoggingDatabaseConnection,
db_conn: "LoggingDatabaseConnection",
table: str,
id_column: str,
positive: bool = True,
Expand Down Expand Up @@ -88,7 +90,7 @@ def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:

def check_consistency(
self,
db_conn: LoggingDatabaseConnection,
db_conn: "LoggingDatabaseConnection",
table: str,
id_column: str,
positive: bool = True,
Expand Down