Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18926.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature.
73 changes: 67 additions & 6 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
Expand Down Expand Up @@ -146,10 +146,37 @@ async def process() -> None:
)

async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering

logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)

await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)

event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)

return

# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
Expand All @@ -167,7 +194,7 @@ async def _unsafe_process_new_event(self) -> None:
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return

logger.debug(
Expand Down Expand Up @@ -202,6 +229,16 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# TODO: How to handle state deltas that are the result of a state reset?

# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)

# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
if delta.event_id is None:
logger.debug(
Expand All @@ -215,10 +252,34 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
sentinel = object()
sender_str = event_id_and_sender_dict.get(delta.event_id, sentinel)
if sender_str is None:
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was unknown. This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is sentinel:
# This can happen if a room is purged. State deltas related to
# the room are left behind, but the event/room no longer exist.
logger.warning(
"Skipping state delta with event ID '%s' as it is an unknown event - the room may have been purged",
delta.event_id,
)
continue
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added after an exception was found when running an earlier draft of this PR on element.io.


try:
# Ignore the type error: if `sender_str` is an object, then it
# will have been caught by the `if` conditional just above.
sender = UserID.from_string(sender_str) # type: ignore[arg-type]
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue
sender = UserID.from_string(event.sender)

next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ async def _unsafe_process(self) -> None:
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return

logger.debug(
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ async def get_current_state_deltas(
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@ def restart_delayed_event_txn(
"restart_delayed_event", restart_delayed_event_txn
)

async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""

def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"

txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0

return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)

async def get_all_delayed_events_for_user(
self,
user_localpart: str,
Expand Down
31 changes: 31 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2135,6 +2135,37 @@ def get_deltas_for_stream_id_txn(

return rows, to_token, True

async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, str]:
"""
Given a sequence of event IDs, return the sender associated with each.

Args:
event_ids: A collection of event IDs as strings.

Returns:
A dict of event ID -> sender of the event.

If a given event ID does not exist in the `events` table, then no entry
for that event ID will be returned.
"""

def _get_senders_for_event_ids(txn: LoggingTransaction) -> Dict[str, str]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)

return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)

@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ async def get_partial_current_state_deltas(
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)

Expand Down
73 changes: 72 additions & 1 deletion tests/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#

import logging
from typing import List, Optional
from typing import Dict, List, Optional

from twisted.internet.testing import MemoryReactor

Expand All @@ -39,6 +39,77 @@
logger = logging.getLogger(__name__)


class EventsTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._store = self.hs.get_datastores().main

def test_get_senders_for_event_ids(self) -> None:
"""Tests the `get_senders_for_event_ids` storage function."""

users_and_tokens: Dict[str, str] = {}
for localpart_suffix in range(10):
localpart = f"user_{localpart_suffix}"
user_id = self.register_user(localpart, "rabbit")
token = self.login(localpart, "rabbit")

users_and_tokens[user_id] = token

room_creator_user_id = self.register_user("room_creator", "rabbit")
room_creator_token = self.login("room_creator", "rabbit")
users_and_tokens[room_creator_user_id] = room_creator_token

# Create a room and invite some users.
room_id = self.helper.create_room_as(
room_creator_user_id, tok=room_creator_token
)
event_ids_to_senders: Dict[str, str] = {}
for user_id, token in users_and_tokens.items():
if user_id == room_creator_user_id:
continue

self.helper.invite(
room=room_id,
targ=user_id,
tok=room_creator_token,
)

# Have the user accept the invite and join the room.
self.helper.join(
room=room_id,
user=user_id,
tok=token,
)

# Have the user send an event.
response = self.helper.send_event(
room_id=room_id,
type="m.room.message",
content={
"msgtype": "m.text",
"body": f"hello, I'm {user_id}!",
},
tok=token,
)

# Record the event ID and sender.
event_id = response["event_id"]
event_ids_to_senders[event_id] = user_id

# Check that `get_senders_for_event_ids` returns the correct data.
response = self.get_success(
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
)
self.assert_dict(event_ids_to_senders, response)


class ExtremPruneTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
Expand Down
Loading