Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18926.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance regression related to the delayed events feature.
51 changes: 44 additions & 7 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,32 @@ async def process() -> None:
)

async def _unsafe_process_new_event(self) -> None:
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering

logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)

await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)

event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)

return

# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
Expand Down Expand Up @@ -202,6 +224,15 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)

# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
earliest_next_send_ts = None
for delta in deltas:
if delta.event_id is None:
logger.debug(
Expand All @@ -215,10 +246,7 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
continue
sender = UserID.from_string(event.sender)
sender = UserID.from_string(event_id_and_sender_dict[delta.event_id])

next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,
Expand All @@ -231,8 +259,17 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
),
)

if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)
# Schedule the next delayed event call for the earliest
# event.
if next_send_ts is not None:
if (
earliest_next_send_ts is None
or next_send_ts < earliest_next_send_ts
):
earliest_next_send_ts = next_send_ts

if self._next_send_ts_changed(earliest_next_send_ts):
self._schedule_next_at_or_none(earliest_next_send_ts)

async def add(
self,
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@ def restart_delayed_event_txn(
"restart_delayed_event", restart_delayed_event_txn
)

async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""

def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"

txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0

return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)

async def get_all_delayed_events_for_user(
self,
user_localpart: str,
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2135,6 +2135,34 @@ def get_deltas_for_stream_id_txn(

return rows, to_token, True

async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, str]:
"""
Given a sequence of event IDs, return the sender associated with each.

Args:
event_ids: A collection of event IDs as strings.

Returns:
A dict of event ID -> sender of the event.
"""

def _get_senders_for_event_ids(txn: LoggingTransaction) -> Dict[str, str]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)

return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)

@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
Expand Down
Loading