Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise async get event lookups #13435

Merged
merged 11 commits into from
Aug 4, 2022
1 change: 1 addition & 0 deletions changelog.d/13435.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent unnecessary lookups to any external `get_event` cache. Contributed by Nick @ Beeper (@fizzadar).
75 changes: 68 additions & 7 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,11 @@ async def _get_events_from_cache_or_db(
Returns:
map from event id to result
"""
event_entry_map = await self._get_events_from_cache(
# Shortcut: check if we have any events in the *in memory* cache - this function
# may be called repeatedly for the same event so at this point we cannot reach
# out to any external cache for performance reasons. The external cache is
# checked later on in the `get_missing_events_from_cache_or_db` function below.
event_entry_map = self._get_events_from_local_cache(
event_ids,
)

Expand Down Expand Up @@ -632,7 +636,9 @@ async def _get_events_from_cache_or_db(

if missing_events_ids:

async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
async def get_missing_events_from_cache_or_db() -> Dict[
str, EventCacheEntry
]:
"""Fetches the events in `missing_event_ids` from the database.

Also creates entries in `self._current_event_fetches` to allow
Expand All @@ -657,10 +663,18 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
# the events have been redacted, and if so pulling the redaction event
# out of the database to check it.
#
missing_events = {}
try:
missing_events = await self._get_events_from_db(
# Try to fetch from any external cache. We already checked the
# in-memory cache above.
missing_events = await self._get_events_from_external_cache(
missing_events_ids,
)
# Now actually fetch any remaining events from the DB
db_missing_events = await self._get_events_from_db(
missing_events_ids - missing_events.keys(),
)
missing_events.update(db_missing_events)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
Expand All @@ -679,7 +693,7 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
# reuse the same fetch.
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
get_missing_events_from_db()
get_missing_events_from_cache_or_db()
)
event_entry_map.update(missing_events)

Expand Down Expand Up @@ -754,7 +768,54 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None:
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from the caches.
"""Fetch events from the caches, both in memory and any external.

May return rejected events.

Args:
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = self._get_events_from_local_cache(
events, update_metrics=update_metrics
)

missing_event_ids = (e for e in events if e not in event_map)
event_map.update(
await self._get_events_from_external_cache(
events=missing_event_ids,
update_metrics=update_metrics,
)
)

return event_map

async def _get_events_from_external_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from any configured external cache.

May return rejected events.

Args:
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = {}

for event_id in events:
ret = await self._get_event_cache.get_external(
(event_id,), None, update_metrics=update_metrics
)
if ret:
event_map[event_id] = ret

return event_map

def _get_events_from_local_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from the local, in memory, caches.

May return rejected events.

Expand All @@ -766,7 +827,7 @@ async def _get_events_from_cache(

for event_id in events:
# First check if it's in the event cache
ret = await self._get_event_cache.get(
ret = self._get_event_cache.get_local(
(event_id,), None, update_metrics=update_metrics
)
if ret:
Expand All @@ -788,7 +849,7 @@ async def _get_events_from_cache(

# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
await self._get_event_cache.set((event_id,), cache_entry)
self._get_event_cache.set_local((event_id,), cache_entry)

return event_map

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ async def _get_joined_users_from_context(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = await self._get_events_from_cache(
event_map = self._get_events_from_local_cache(
member_event_ids, update_metrics=False
)

Expand Down
17 changes: 17 additions & 0 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,26 @@ async def get(
) -> Optional[VT]:
return self._lru_cache.get(key, update_metrics=update_metrics)

async def get_external(
self,
key: KT,
default: Optional[T] = None,
update_metrics: bool = True,
) -> Optional[VT]:
# This method should fetch from any configured external cache, in this case noop.
return None

def get_local(
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
) -> Optional[VT]:
return self._lru_cache.get(key, update_metrics=update_metrics)

async def set(self, key: KT, value: VT) -> None:
self._lru_cache.set(key, value)

def set_local(self, key: KT, value: VT) -> None:
self._lru_cache.set(key, value)

async def invalidate(self, key: KT) -> None:
# This method should invalidate any external cache and then invalidate the LruCache.
return self._lru_cache.invalidate(key)
Expand Down