Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type hints to schema deltas #15497

Merged
merged 12 commits into from
Apr 27, 2023
1 change: 1 addition & 0 deletions changelog.d/15496.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve type hints.
1 change: 1 addition & 0 deletions changelog.d/15497.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve type hints.
13 changes: 0 additions & 13 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,7 @@ files =
tests/,
build_rust.py

# Note: Better exclusion syntax coming in mypy > 0.910
# https://github.com/python/mypy/pull/11329
#
# For now, set the (?x) flag enable "verbose" regexes
# https://docs.python.org/3/library/re.html#re.X
exclude = (?x)
^(
|synapse/storage/databases/__init__.py
|synapse/storage/databases/main/cache.py
|synapse/storage/schema/
)$

[mypy-synapse.metrics._reactor_metrics]
disallow_untyped_defs = False
# This module imports select.epoll. That exists on Linux, but doesn't on macOS.
# See https://github.com/matrix-org/synapse/pull/11771.
warn_unused_ignores = False
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main, db_conn)
persist_events = PersistEventsStore(hs, database, main, db_conn) # type: ignore[arg-type]

if "state" in database_config.databases:
logger.info(
Expand Down Expand Up @@ -133,6 +133,6 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):

# We use local variables here to ensure that the databases do not have
# optional types.
self.main = main
self.main = main # type: ignore[assignment]
self.state = state
self.persist_events = persist_events
16 changes: 9 additions & 7 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,13 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
assert isinstance(data, EventsStreamCurrentStateRow)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand All @@ -229,7 +229,7 @@ def _invalidate_caches_for_event(
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self._invalidate_local_get_event_cache(event_id) # type: ignore[attr-defined]

self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
Expand All @@ -242,10 +242,10 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]

if redacts:
self._invalidate_local_get_event_cache(redacts)
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
# Caches which might leak edits must be invalidated for the event being
# redacted.
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
Expand All @@ -254,7 +254,7 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
Expand Down Expand Up @@ -378,6 +378,8 @@ def _send_invalidation_to_replication(
)

if isinstance(self.database_engine, PostgresEngine):
assert self._cache_id_gen is not None

# get_next() returns a context manager which is designed to wrap
# the transaction. However, we want to only get an ID when we want
# to use it, here, so we need to call __enter__ manually, and have
Expand Down
8 changes: 5 additions & 3 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing_extensions import Counter as CounterType

from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
from synapse.storage.types import Cursor
Expand Down Expand Up @@ -168,7 +168,9 @@ def prepare_database(


def _setup_new_database(
cur: Cursor, database_engine: BaseDatabaseEngine, databases: Collection[str]
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
databases: Collection[str],
) -> None:
"""Sets up the physical database by finding a base set of "full schemas" and
then applying any necessary deltas, including schemas from the given data
Expand Down Expand Up @@ -289,7 +291,7 @@ def _setup_new_database(


def _upgrade_existing_database(
cur: Cursor,
cur: LoggingTransaction,
current_schema_state: _SchemaState,
database_engine: BaseDatabaseEngine,
config: Optional[HomeServerConfig],
Expand Down
13 changes: 6 additions & 7 deletions synapse/storage/schema/main/delta/20/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@

import logging

from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine

logger = logging.getLogger(__name__)


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
logger.info("Porting pushers table...")
cur.execute(
"""
Expand Down Expand Up @@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs):
"""
)
count = 0
for row in cur.fetchall():
row = list(row)
for tuple_row in cur.fetchall():
row = list(tuple_row)
row[8] = bytes(row[8]).decode("utf-8")
row[11] = bytes(row[11]).decode("utf-8")
cur.execute(
Expand All @@ -81,7 +84,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)


def run_upgrade(*args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe to remove, because we do a hasattr check:

if not is_empty and hasattr(module, "run_upgrade"):

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have called that out, sorry. 👍

pass
9 changes: 3 additions & 6 deletions synapse/storage/schema/main/delta/25/fts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import json
import logging

from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.prepare_database import get_statements

logger = logging.getLogger(__name__)
Expand All @@ -41,7 +42,7 @@
)


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
for statement in get_statements(POSTGRES_TABLE.splitlines()):
cur.execute(statement)
Expand Down Expand Up @@ -72,7 +73,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_search", progress_json))


def run_upgrade(*args, **kwargs):
pass
8 changes: 3 additions & 5 deletions synapse/storage/schema/main/delta/27/ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import json
import logging

from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements

logger = logging.getLogger(__name__)
Expand All @@ -25,7 +27,7 @@
)


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
for statement in get_statements(ALTER_TABLE.splitlines()):
cur.execute(statement)

Expand All @@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_origin_server_ts", progress_json))


def run_upgrade(*args, **kwargs):
pass
16 changes: 12 additions & 4 deletions synapse/storage/schema/main/delta/30/as_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, Iterable, List, Tuple, cast

from synapse.config.appservice import load_appservices
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine

logger = logging.getLogger(__name__)


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# NULL indicates user was not registered by an appservice.
try:
cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
Expand All @@ -27,9 +31,13 @@ def run_create(cur, database_engine, *args, **kwargs):
pass


def run_upgrade(cur, database_engine, config, *args, **kwargs):
def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
cur.execute("SELECT name FROM users")
rows = cur.fetchall()
rows = cast(Iterable[Tuple[str]], cur.fetchall())

config_files = []
try:
Expand All @@ -39,7 +47,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):

appservices = load_appservices(config.server.server_name, config_files)

owned = {}
owned: Dict[str, List[str]] = {}

for row in rows:
user_id = row[0]
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the way mypy assigns python files to modules (it essentially searches the directory tree until it finds a __init__.py) we need all Python schema deltas to be unique.

This was the only conflict; I played with some of the options for changing this behavior (namespace packages, etc.) but it seemed easiest to just rename the single offender.

This is from a really really old schema so I think renaming it won't break anything, but I'm not 100% confident on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only breakage I can see: someone who is still on schema 31 might end up trying to apply the renamed migration after it was already run, if they upgraded directly to a Synapse version with this change.

This looks to be around 7 years old now. I think we should be blasé about breaking setups which are that old and suffer from known security problems.

Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@

import logging

from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine

logger = logging.getLogger(__name__)


def token_to_stream_ordering(token):
def token_to_stream_ordering(token: str) -> int:
return int(token[1:].split("_")[0])


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
logger.info("Porting pushers table, delta 31...")
cur.execute(
"""
Expand Down Expand Up @@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs):
"""
)
count = 0
for row in cur.fetchall():
row = list(row)
for tuple_row in cur.fetchall():
row = list(tuple_row)
row[12] = token_to_stream_ordering(row[12])
cur.execute(
"""
Expand All @@ -80,7 +83,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
9 changes: 3 additions & 6 deletions synapse/storage/schema/main/delta/31/search_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import json
import logging

from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import get_statements

logger = logging.getLogger(__name__)
Expand All @@ -26,7 +27,7 @@
"""


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if not isinstance(database_engine, PostgresEngine):
return

Expand Down Expand Up @@ -56,7 +57,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_search_order", progress_json))


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
8 changes: 3 additions & 5 deletions synapse/storage/schema/main/delta/33/event_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import json
import logging

from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements

logger = logging.getLogger(__name__)
Expand All @@ -25,7 +27,7 @@
"""


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
for statement in get_statements(ALTER_TABLE.splitlines()):
cur.execute(statement)

Expand All @@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)

cur.execute(sql, ("event_fields_sender_url", progress_json))


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
12 changes: 10 additions & 2 deletions synapse/storage/schema/main/delta/33/remote_media_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@

import time

from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine

ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"


def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
cur.execute(ALTER_TABLE)


def run_upgrade(cur, database_engine, *args, **kwargs):
def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
cur.execute(
"UPDATE remote_media_cache SET last_access_ts = ?",
(int(time.time() * 1000),),
Expand Down
Loading