Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18926.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature.
106 changes: 96 additions & 10 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
Expand All @@ -45,6 +45,7 @@
)
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -146,10 +147,37 @@ async def process() -> None:
)

async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering

logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)

await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)

event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)

return

# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
Expand All @@ -167,7 +195,7 @@ async def _unsafe_process_new_event(self) -> None:
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return

logger.debug(
Expand Down Expand Up @@ -202,23 +230,81 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)

# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

# `delta.event_id` and `delta.sender` can be `None` in a few valid
# cases (see the docstring of
# `get_current_state_delta_membership_changes_for_user` for details).
if delta.event_id is None:
logger.debug(
"Not handling delta for deleted state: %r %r",
# TODO: Differentiate between this being caused by a state reset
# which removed a user from a room, or the homeserver
# purposefully having left the room. We can do so by checking
# whether there are any local memberships still left in the
# room. If so, then this is the result of a state reset.
#
# If it is a state reset, we should avoid cancelling new,
# delayed state events due to old state resurfacing. So we
# should skip and log a warning in this case.
#
# If the homeserver has left the room, then we should cancel all
# delayed state events intended for this room, as there is no
# need to try and send a delayed event into a room we've left.
logger.warning(
"Skipping state delta (%r, %r) without corresponding event ID. "
"This can happen if the homeserver has left the room (in which "
"case this can be ignored), or if there has been a state reset "
"which has caused the sender to be kicked out of the room",
delta.event_type,
delta.state_key,
)
continue

logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
sender_str = event_id_and_sender_dict.get(
delta.event_id, Sentinel.UNSET_SENTINEL
)
if sender_str is None:
# An event exists, but the `sender` field was "null" and Synapse
# incorrectly accepted the event. This is not expected.
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was None. "
"This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is Sentinel.UNSET_SENTINEL:
# We have an event ID, but the event was not found in the
# datastore. This can happen if a room, or its history, is
# purged. State deltas related to the room are left behind, but
# the event no longer exists.
#
# As we cannot get the sender of this event, we can't calculate
# whether to cancel delayed events related to this one. So we skip.
logger.debug(
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
delta.event_id,
)
continue

event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue
sender = UserID.from_string(event.sender)

next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ async def _unsafe_process(self) -> None:
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return

logger.debug(
Expand Down
8 changes: 1 addition & 7 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#


import enum
import logging
from itertools import chain
from typing import (
Expand Down Expand Up @@ -75,6 +74,7 @@
)
from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping
from synapse.util.sentinel import Sentinel

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -83,12 +83,6 @@
logger = logging.getLogger(__name__)


class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()


# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ async def get_current_state_deltas(
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@ def restart_delayed_event_txn(
"restart_delayed_event", restart_delayed_event_txn
)

async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""

def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"

txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0

return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)

async def get_all_delayed_events_for_user(
self,
user_localpart: str,
Expand Down
33 changes: 33 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2135,6 +2135,39 @@ def get_deltas_for_stream_id_txn(

return rows, to_token, True

async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, Optional[str]]:
"""
Given a sequence of event IDs, return the sender associated with each.

Args:
event_ids: A collection of event IDs as strings.

Returns:
A dict of event ID -> sender of the event.

If a given event ID does not exist in the `events` table, then no entry
for that event ID will be returned.
"""

def _get_senders_for_event_ids(
txn: LoggingTransaction,
) -> Dict[str, Optional[str]]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)

return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)

@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ async def get_partial_current_state_deltas(
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)

Expand Down
21 changes: 21 additions & 0 deletions synapse/util/sentinel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#

import enum


class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()
73 changes: 72 additions & 1 deletion tests/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#

import logging
from typing import List, Optional
from typing import Dict, List, Optional

from twisted.internet.testing import MemoryReactor

Expand All @@ -39,6 +39,77 @@
logger = logging.getLogger(__name__)


class EventsTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._store = self.hs.get_datastores().main

def test_get_senders_for_event_ids(self) -> None:
"""Tests the `get_senders_for_event_ids` storage function."""

users_and_tokens: Dict[str, str] = {}
for localpart_suffix in range(10):
localpart = f"user_{localpart_suffix}"
user_id = self.register_user(localpart, "rabbit")
token = self.login(localpart, "rabbit")

users_and_tokens[user_id] = token

room_creator_user_id = self.register_user("room_creator", "rabbit")
room_creator_token = self.login("room_creator", "rabbit")
users_and_tokens[room_creator_user_id] = room_creator_token

# Create a room and invite some users.
room_id = self.helper.create_room_as(
room_creator_user_id, tok=room_creator_token
)
event_ids_to_senders: Dict[str, str] = {}
for user_id, token in users_and_tokens.items():
if user_id == room_creator_user_id:
continue

self.helper.invite(
room=room_id,
targ=user_id,
tok=room_creator_token,
)

# Have the user accept the invite and join the room.
self.helper.join(
room=room_id,
user=user_id,
tok=token,
)

# Have the user send an event.
response = self.helper.send_event(
room_id=room_id,
type="m.room.message",
content={
"msgtype": "m.text",
"body": f"hello, I'm {user_id}!",
},
tok=token,
)

# Record the event ID and sender.
event_id = response["event_id"]
event_ids_to_senders[event_id] = user_id

# Check that `get_senders_for_event_ids` returns the correct data.
response = self.get_success(
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
)
self.assert_dict(event_ids_to_senders, response)


class ExtremPruneTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
Expand Down
Loading