Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18926.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature.
58 changes: 53 additions & 5 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
Expand Down Expand Up @@ -146,10 +146,37 @@ async def process() -> None:
)

async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering

logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)

await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)

event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)

return

# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
Expand Down Expand Up @@ -202,6 +229,14 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)

# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
if delta.event_id is None:
logger.debug(
Expand All @@ -215,10 +250,23 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
sender_str = event_id_and_sender_dict.get(delta.event_id, None)
if sender_str is None:
logger.debug(
"Not handling delta for event where sender is unknown: %s",
delta.event_id,
)
continue

try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue
sender = UserID.from_string(event.sender)

next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ async def get_current_state_deltas(
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@ def restart_delayed_event_txn(
"restart_delayed_event", restart_delayed_event_txn
)

async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""

def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"

txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0

return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)

async def get_all_delayed_events_for_user(
self,
user_localpart: str,
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2135,6 +2135,34 @@ def get_deltas_for_stream_id_txn(

return rows, to_token, True

async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, str]:
"""
Given a sequence of event IDs, return the sender associated with each.

Args:
event_ids: A collection of event IDs as strings.

Returns:
A dict of event ID -> sender of the event.
"""

def _get_senders_for_event_ids(txn: LoggingTransaction) -> Dict[str, str]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)

return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)

@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ async def get_partial_current_state_deltas(
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)

Expand Down
Loading