Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise async get event lookups #13435

Merged
merged 11 commits into from
Aug 4, 2022
1 change: 1 addition & 0 deletions changelog.d/13435.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent unnecessary lookups to any external `get_event` cache. Contributed by Nick @ Beeper (@fizzadar).
74 changes: 67 additions & 7 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,11 @@ async def _get_events_from_cache_or_db(
Returns:
map from event id to result
"""
event_entry_map = await self._get_events_from_cache(
# Shortcut: check if we have any events in the *in memory* cache - this function
# may be called repeatedly for the same event so at this point we cannot reach
# out to any external cache for performance reasons, this is done later on in
# the `get_missing_events_from_cache_or_db` function below.
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
event_entry_map = self._get_events_from_local_cache(
event_ids,
)

Expand Down Expand Up @@ -632,7 +636,9 @@ async def _get_events_from_cache_or_db(

if missing_events_ids:

async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
async def get_missing_events_from_cache_or_db() -> Dict[
str, EventCacheEntry
]:
"""Fetches the events in `missing_event_ids` from the database.

Also creates entries in `self._current_event_fetches` to allow
Expand All @@ -657,10 +663,17 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
# the events have been redacted, and if so pulling the redaction event
# out of the database to check it.
#
missing_events = {}
try:
missing_events = await self._get_events_from_db(
# Try to fetch from any external cache, in-memory cache checked above
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
missing_events = await self._get_events_from_external_cache(
missing_events_ids,
)
# Now actually fetch any remaining events from the DB
db_missing_events = await self._get_events_from_db(
missing_events_ids - missing_events.keys(),
)
missing_events.update(db_missing_events)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
Expand All @@ -679,7 +692,7 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
# reuse the same fetch.
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
get_missing_events_from_db()
get_missing_events_from_cache_or_db()
)
event_entry_map.update(missing_events)

Expand Down Expand Up @@ -754,7 +767,54 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None:
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from the caches.
"""Fetch events from the caches, both in memory and any external.

May return rejected events.

Args:
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = self._get_events_from_local_cache(
events, update_metrics=update_metrics
)

missing_event_ids = {e for e in events if e not in event_map}
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
event_map.update(
await self._get_events_from_external_cache(
events=missing_event_ids,
update_metrics=update_metrics,
)
)

return event_map

async def _get_events_from_external_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from any configured external cache.

May return rejected events.

Args:
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = {}

for event_id in events:
ret = await self._get_event_cache.get_external(
(event_id,), None, update_metrics=update_metrics
)
if ret:
event_map[event_id] = ret

return event_map

def _get_events_from_local_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from the local, in memory, caches.

May return rejected events.

Expand All @@ -766,7 +826,7 @@ async def _get_events_from_cache(

for event_id in events:
# First check if it's in the event cache
ret = await self._get_event_cache.get(
ret = self._get_event_cache.get_local(
(event_id,), None, update_metrics=update_metrics
)
if ret:
Expand All @@ -788,7 +848,7 @@ async def _get_events_from_cache(

# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
await self._get_event_cache.set((event_id,), cache_entry)
self._get_event_cache.set_local((event_id,), cache_entry)

return event_map

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ async def _get_joined_users_from_context(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = await self._get_events_from_cache(
event_map = self._get_events_from_local_cache(
member_event_ids, update_metrics=False
)

Expand Down
17 changes: 17 additions & 0 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,26 @@ async def get(
) -> Optional[VT]:
return self._lru_cache.get(key, update_metrics=update_metrics)

async def get_external(
self,
key: KT,
default: Optional[T] = None,
update_metrics: bool = True,
) -> Optional[VT]:
# This method should fetch from any configured external cache, in this case noop.
return None

def get_local(
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
) -> Optional[VT]:
return self._lru_cache.get(key, update_metrics=update_metrics)

async def set(self, key: KT, value: VT) -> None:
self._lru_cache.set(key, value)

def set_local(self, key: KT, value: VT) -> None:
self._lru_cache.set(key, value)

async def invalidate(self, key: KT) -> None:
# This method should invalidate any external cache and then invalidate the LruCache.
return self._lru_cache.invalidate(key)
Expand Down