Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor EventPersistenceQueue (#10145)
Browse files Browse the repository at this point in the history
some cleanup, pulled out of #10134.
  • Loading branch information
richvdh committed Jun 14, 2021
1 parent d7808a2 commit 1dfdc87
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 77 deletions.
1 change: 1 addition & 0 deletions changelog.d/10145.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor EventPersistenceQueue.
165 changes: 88 additions & 77 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,23 @@

import itertools
import logging
from collections import deque, namedtuple
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from collections import deque
from typing import (
Awaitable,
Callable,
Collection,
Deque,
Dict,
Generic,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
)

import attr
from prometheus_client import Counter, Histogram

from twisted.internet import defer
Expand All @@ -37,7 +51,7 @@
StateMap,
get_domain_from_id,
)
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,64 +103,84 @@
)


class _EventPeristenceQueue:
@attr.s(auto_attribs=True, frozen=True, slots=True)
class _EventPersistQueueItem:
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
deferred: ObservableDeferred


_PersistResult = TypeVar("_PersistResult")


class _EventPeristenceQueue(Generic[_PersistResult]):
"""Queues up events so that they can be persisted in bulk with only one
concurrent transaction per room.
"""

_EventPersistQueueItem = namedtuple(
"_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
)
def __init__(
self,
per_item_callback: Callable[
[List[Tuple[EventBase, EventContext]], bool],
Awaitable[_PersistResult],
],
):
"""Create a new event persistence queue
def __init__(self):
self._event_persist_queues = {}
self._currently_persisting_rooms = set()
The per_item_callback will be called for each item added via add_to_queue,
and its result will be returned via the Deferreds returned from add_to_queue.
"""
self._event_persist_queues: Dict[str, Deque[_EventPersistQueueItem]] = {}
self._currently_persisting_rooms: Set[str] = set()
self._per_item_callback = per_item_callback

def add_to_queue(self, room_id, events_and_contexts, backfilled):
async def add_to_queue(
self,
room_id: str,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
backfilled: bool,
) -> _PersistResult:
"""Add events to the queue, with the given persist_event options.
NB: due to the normal usage pattern of this method, it does *not*
follow the synapse logcontext rules, and leaves the logcontext in
place whether or not the returned deferred is ready.
If we are not already processing events in this room, starts off a background
process to to so, calling the per_item_callback for each item.
Args:
room_id (str):
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
Returns:
defer.Deferred: a deferred which will resolve once the events are
persisted. Runs its callbacks *without* a logcontext. The result
is the same as that returned by the callback passed to
`handle_queue`.
the result returned by the `_per_item_callback` passed to
`__init__`.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
end_item = queue[-1]
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()

deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
if queue and queue[-1].backfilled == backfilled:
end_item = queue[-1]
else:
# need to make a new queue item
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)

queue.append(
self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
end_item = _EventPersistQueueItem(
events_and_contexts=[],
backfilled=backfilled,
deferred=deferred,
)
)
queue.append(end_item)

return deferred.observe()
end_item.events_and_contexts.extend(events_and_contexts)
self._handle_queue(room_id)
return await make_deferred_yieldable(end_item.deferred.observe())

def handle_queue(self, room_id, per_item_callback):
def _handle_queue(self, room_id):
"""Attempts to handle the queue for a room if not already being handled.
The given callback will be invoked with for each item in the queue,
The queue's callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return
be called with new items, unless the queue becomes empty. The return
value of the function will be given to the deferreds waiting on the item,
exceptions will be passed to the deferreds as well.
Expand All @@ -156,7 +190,6 @@ def handle_queue(self, room_id, per_item_callback):
If another callback is currently handling the queue then it will not be
invoked.
"""

if room_id in self._currently_persisting_rooms:
return

Expand All @@ -167,7 +200,9 @@ async def handle_queue_loop():
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
ret = await per_item_callback(item)
ret = await self._per_item_callback(
item.events_and_contexts, item.backfilled
)
except Exception:
with PreserveLoggingContext():
item.deferred.errback()
Expand Down Expand Up @@ -214,7 +249,7 @@ def __init__(self, hs, stores: Databases):
self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
self._event_persist_queue = _EventPeristenceQueue()
self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch)
self._state_resolution_handler = hs.get_state_resolution_handler()

async def persist_events(
Expand All @@ -241,26 +276,21 @@ async def persist_events(
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))

deferreds = []
for room_id, evs_ctxs in partitioned.items():
d = self._event_persist_queue.add_to_queue(
async def enqueue(item):
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
)
deferreds.append(d)

for room_id in partitioned:
self._maybe_start_persisting(room_id)
ret_vals = await yieldable_gather_results(enqueue, partitioned.items())

# Each deferred returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# Each call to add_to_queue returns a map from event ID to existing event ID if
# the event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events).
#
# Since we use `defer.gatherResults` we need to merge the returned list
# Since we use `yieldable_gather_results` we need to merge the returned list
# of dicts into one.
ret_vals = await make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
replaced_events = {}
replaced_events: Dict[str, str] = {}
for d in ret_vals:
replaced_events.update(d)

Expand All @@ -287,16 +317,12 @@ async def persist_event(
event if it was deduplicated due to an existing event matching the
transaction ID.
"""
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
)

self._maybe_start_persisting(event.room_id)

# The deferred returns a map from event ID to existing event ID if the
# add_to_queue returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events.)
replaced_events = await make_deferred_yieldable(deferred)
replaced_events = await self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
)
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
Expand All @@ -308,29 +334,14 @@ async def persist_event(
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()

def _maybe_start_persisting(self, room_id: str):
"""Pokes the `_event_persist_queue` to start handling new items in the
queue, if not already in progress.
Causes the deferreds returned by `add_to_queue` to resolve with: a
dictionary of event ID to event ID we didn't persist as we already had
another event persisted with the same TXN ID.
"""

async def persisting_queue(item):
with Measure(self._clock, "persist_events"):
return await self._persist_events(
item.events_and_contexts, backfilled=item.backfilled
)

self._event_persist_queue.handle_queue(room_id, persisting_queue)

async def _persist_events(
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> Dict[str, str]:
"""Calculates the change to current state and forward extremities, and
"""Callback for the _event_persist_queue
Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Returns:
Expand Down

0 comments on commit 1dfdc87

Please sign in to comment.