-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Optimize have_seen_events
#13561
Optimize have_seen_events
#13561
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -54,7 +54,13 @@ | |||||||||
current_context, | ||||||||||
make_deferred_yieldable, | ||||||||||
) | ||||||||||
from synapse.logging.opentracing import start_active_span, tag_args, trace | ||||||||||
from synapse.logging.opentracing import ( | ||||||||||
SynapseTags, | ||||||||||
set_tag, | ||||||||||
start_active_span, | ||||||||||
tag_args, | ||||||||||
trace, | ||||||||||
) | ||||||||||
from synapse.metrics.background_process_metrics import ( | ||||||||||
run_as_background_process, | ||||||||||
wrap_as_background_process, | ||||||||||
|
@@ -1449,7 +1455,7 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]: | |||||||||
@trace | ||||||||||
@tag_args | ||||||||||
async def have_seen_events( | ||||||||||
self, room_id: str, event_ids: Iterable[str] | ||||||||||
self, room_id: str, event_ids: Collection[str] | ||||||||||
) -> Set[str]: | ||||||||||
"""Given a list of event ids, check if we have already processed them. | ||||||||||
|
||||||||||
|
@@ -1462,68 +1468,68 @@ async def have_seen_events( | |||||||||
event_ids: events we are looking for | ||||||||||
|
||||||||||
Returns: | ||||||||||
The set of events we have already seen. | ||||||||||
The remaining set of events we haven't seen. | ||||||||||
""" | ||||||||||
set_tag( | ||||||||||
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", | ||||||||||
str(len(event_ids)), | ||||||||||
) | ||||||||||
|
||||||||||
# @cachedList chomps lots of memory if you call it with a big list, so | ||||||||||
# we break it down. However, each batch requires its own index scan, so we make | ||||||||||
# the batches as big as possible. | ||||||||||
|
||||||||||
results: Set[str] = set() | ||||||||||
for chunk in batch_iter(event_ids, 500): | ||||||||||
r = await self._have_seen_events_dict( | ||||||||||
[(room_id, event_id) for event_id in chunk] | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a bunch of avoiding creating new intermediary lists and using |
||||||||||
) | ||||||||||
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event) | ||||||||||
remaining_event_ids: Set[str] = set() | ||||||||||
for chunk in batch_iter(event_ids, 1000): | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the number There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's just arbitrarily doubled to have less round-trip time to the database with the really big rooms. I don't know of a good heuristic to justify a better value. But I know we don't want the SQL queries to get too big which is why we're batching in the first place. Good to know that we shouldn't exceed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, that was a very tentative "if"; mentioned it as a guess because it was mentioned in the PR discussion. If (again, if ;)) they are not supposed to coupled, perhaps consider moving it to a file-local constant with |
||||||||||
remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk) | ||||||||||
remaining_event_ids.update(remaining_event_ids_from_chunk) | ||||||||||
|
||||||||||
return results | ||||||||||
return remaining_event_ids | ||||||||||
|
||||||||||
@cachedList(cached_method_name="have_seen_event", list_name="keys") | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any max size of a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It ought to be the same cache as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some discussion in backend internal, Seems like And it's not one of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@squahtx Ahhh, I think see what you're trying to say. synapse/synapse/storage/databases/main/events_worker.py Lines 1481 to 1482 in f9f0342
synapse/synapse/storage/databases/main/events_worker.py Lines 1523 to 1524 in f9f0342
In any case, it seems to have a 0% cache hit rate, https://grafana.matrix.org/d/000000012/synapse?orgId=1&from=1660891259284&to=1660934459284&viewPanel=1 And the cache size doesn't get above 6k: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This cache still seems so weird to me. Why doesn't it grow bigger? We should at least see 200k state events from Matrix HQ show up and completely fill up the cache sometimes. Or at least the cache should keep growing until full? https://grafana.matrix.org/d/000000012/synapse?orgId=1&from=1664997921975&to=1666293921975&viewPanel=8 |
||||||||||
async def _have_seen_events_dict( | ||||||||||
self, keys: Collection[Tuple[str, str]] | ||||||||||
) -> Dict[Tuple[str, str], bool]: | ||||||||||
# @cachedList(cached_method_name="have_seen_event", list_name="event_ids") | ||||||||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
async def _have_seen_events_dict(self, event_ids: Collection[str]) -> set[str]: | ||||||||||
"""Helper for have_seen_events | ||||||||||
|
||||||||||
Returns: | ||||||||||
a dict {(room_id, event_id)-> bool} | ||||||||||
The remaining set of events we haven't seen. | ||||||||||
""" | ||||||||||
# if the event cache contains the event, obviously we've seen it. | ||||||||||
|
||||||||||
cache_results = { | ||||||||||
(rid, eid) | ||||||||||
for (rid, eid) in keys | ||||||||||
if await self._get_event_cache.contains((eid,)) | ||||||||||
# if the event cache contains the event, obviously we've seen it. | ||||||||||
event_cache_entry_map = self._get_events_from_local_cache(event_ids) | ||||||||||
event_ids_in_cache = event_cache_entry_map.keys() | ||||||||||
remaining_event_ids = { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these enumerations a significant part of the runtime? Unless I'm missing something being lazy or whatnot here, this:
can be simplified to:
It should only be necessary to perform one full enumeration of the eventids in a single pass. That'd require complementing or modifying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Maybe but it's practically nothing compared to |
||||||||||
event_id for event_id in event_ids if event_id not in event_ids_in_cache | ||||||||||
} | ||||||||||
results = dict.fromkeys(cache_results, True) | ||||||||||
remaining = [k for k in keys if k not in cache_results] | ||||||||||
if not remaining: | ||||||||||
return results | ||||||||||
if not remaining_event_ids: | ||||||||||
return set() | ||||||||||
|
||||||||||
def have_seen_events_txn(txn: LoggingTransaction) -> None: | ||||||||||
nonlocal remaining_event_ids | ||||||||||
# we deliberately do *not* query the database for room_id, to make the | ||||||||||
# query an index-only lookup on `events_event_id_key`. | ||||||||||
# | ||||||||||
# We therefore pull the events from the database into a set... | ||||||||||
|
||||||||||
sql = "SELECT event_id FROM events AS e WHERE " | ||||||||||
clause, args = make_in_list_sql_clause( | ||||||||||
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining] | ||||||||||
txn.database_engine, "e.event_id", remaining_event_ids | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any random Postgres SQL tricks to make looking up 1000 events faster? Like this sort of stuff: |
||||||||||
) | ||||||||||
txn.execute(sql + clause, args) | ||||||||||
found_events = {eid for eid, in txn} | ||||||||||
found_event_ids = {eid for eid, in txn} | ||||||||||
|
||||||||||
# ... and then we can update the results for each key | ||||||||||
results.update( | ||||||||||
{(rid, eid): (eid in found_events) for (rid, eid) in remaining} | ||||||||||
) | ||||||||||
remaining_event_ids = { | ||||||||||
event_id | ||||||||||
for event_id in remaining_event_ids | ||||||||||
if event_id not in found_event_ids | ||||||||||
} | ||||||||||
|
||||||||||
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn) | ||||||||||
return results | ||||||||||
return remaining_event_ids | ||||||||||
|
||||||||||
@cached(max_entries=100000, tree=True) | ||||||||||
async def have_seen_event(self, room_id: str, event_id: str) -> bool: | ||||||||||
res = await self._have_seen_events_dict(((room_id, event_id),)) | ||||||||||
return res[(room_id, event_id)] | ||||||||||
remaining_event_ids = await self._have_seen_events_dict({event_id}) | ||||||||||
return event_id not in remaining_event_ids | ||||||||||
|
||||||||||
def _get_current_state_event_counts_txn( | ||||||||||
self, txn: LoggingTransaction, room_id: str | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed from returning the events we saw to the events we didn't see.
In big rooms like
#matrixhq
, we've seen most of the events already. Only a little piece of the state is new.