Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,32 @@ async def process() -> None:
)

async def _unsafe_process_new_event(self) -> None:
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering

logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)

await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)

event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)

return

# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@ def restart_delayed_event_txn(
"restart_delayed_event", restart_delayed_event_txn
)

async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""

def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"

txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0

return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)

async def get_all_delayed_events_for_user(
self,
user_localpart: str,
Expand Down
Loading