From c485ed1c5a4c62ae555531cfd001a5e5f8bc2e44 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jun 2023 13:14:40 +0100 Subject: [PATCH] Clear event caches when we purge history (#15609) This should help a little with #13476 --------- Co-authored-by: Patrick Cloke --- changelog.d/15609.bugfix | 1 + synapse/storage/_base.py | 31 ++++ synapse/storage/databases/main/cache.py | 134 ++++++++++++++++++ .../storage/databases/main/events_worker.py | 9 ++ .../storage/databases/main/purge_events.py | 8 +- synapse/util/caches/lrucache.py | 2 +- tests/handlers/test_sync.py | 2 +- tests/rest/client/test_read_marker.py | 3 - .../databases/main/test_events_worker.py | 8 +- 9 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 changelog.d/15609.bugfix diff --git a/changelog.d/15609.bugfix b/changelog.d/15609.bugfix new file mode 100644 index 000000000000..b5a990cfec1e --- /dev/null +++ b/changelog.d/15609.bugfix @@ -0,0 +1 @@ +Correctly clear caches when we delete a room. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 481fec72fe1b..fe4a76341137 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,9 +86,14 @@ def _invalidate_state_caches( room_id: Room where state changed members_changed: The user_ids of members that have changed """ + + # XXX: If you add something to this function make sure you add it to + # `_invalidate_state_caches_all` as well. + # If there were any membership changes, purge the appropriate caches. for host in {get_domain_from_id(u) for u in members_changed}: self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) + self._attempt_to_invalidate_cache("is_host_invited", (room_id, host)) if members_changed: self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) @@ -117,6 +122,32 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) + def _invalidate_state_caches_all(self, room_id: str) -> None: + """Invalidates caches that are based on the current state, but does + not stream invalidations down replication. + + Same as `_invalidate_state_caches`, except that works when we don't know + which memberships have changed. + + Args: + room_id: Room where state changed + """ + self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("is_host_invited", None) + self._attempt_to_invalidate_cache("is_host_joined", None) + self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) + self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) + self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", None + ) + self._attempt_to_invalidate_cache("get_rooms_for_user", None) + self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) + def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] ) -> bool: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 46fa0a73f9e4..6e1c7d681fe5 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -46,6 +46,12 @@ # based on the current state when notifying workers over replication. CURRENT_STATE_CACHE_NAME = "cs_cache_fake" +# As above, but for invalidating event caches on history deletion +PURGE_HISTORY_CACHE_NAME = "ph_cache_fake" + +# As above, but for invalidating room caches on room deletion +DELETE_ROOM_CACHE_NAME = "dr_cache_fake" + class CacheInvalidationWorkerStore(SQLBaseStore): def __init__( @@ -175,6 +181,23 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + elif row.cache_func == PURGE_HISTORY_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for 'purge history' cache" + ) + + room_id = row.keys[0] + self._invalidate_caches_for_room_events(room_id) + elif row.cache_func == DELETE_ROOM_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for 'delete room' cache" + ) + + room_id = row.keys[0] + self._invalidate_caches_for_room_events(room_id) + self._invalidate_caches_for_room(room_id) else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -226,6 +249,9 @@ def _invalidate_caches_for_event( relates_to: Optional[str], backfilled: bool, ) -> None: + # XXX: If you add something to this function make sure you add it to + # `_invalidate_caches_for_room_events` as well. + # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external # cached objects. @@ -271,6 +297,106 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) self._attempt_to_invalidate_cache("get_threads", (room_id,)) + def _invalidate_caches_for_room_events_and_stream( + self, txn: LoggingTransaction, room_id: str + ) -> None: + """Invalidate caches associated with events in a room, and stream to + replication. + + Used when we delete events a room, but don't know which events we've + deleted. + """ + + self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id]) + txn.call_after(self._invalidate_caches_for_room_events, room_id) + + def _invalidate_caches_for_room_events(self, room_id: str) -> None: + """Invalidate caches associated with events in a room, and stream to + replication. + + Used when we delete events in a room, but don't know which events we've + deleted. + """ + + self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined] + + self._attempt_to_invalidate_cache("have_seen_event", (room_id,)) + self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) + + self._attempt_to_invalidate_cache("_get_membership_from_event_id", None) + self._attempt_to_invalidate_cache("get_relations_for_event", None) + self._attempt_to_invalidate_cache("get_applicable_edit", None) + self._attempt_to_invalidate_cache("get_thread_id", None) + self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None) + self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", None + ) + self._attempt_to_invalidate_cache("get_rooms_for_user", None) + self._attempt_to_invalidate_cache("get_references_for_event", None) + self._attempt_to_invalidate_cache("get_thread_summary", None) + self._attempt_to_invalidate_cache("get_thread_participated", None) + self._attempt_to_invalidate_cache("get_threads", (room_id,)) + + self._attempt_to_invalidate_cache("_get_state_group_for_event", None) + + self._attempt_to_invalidate_cache("get_event_ordering", None) + self._attempt_to_invalidate_cache("is_partial_state_event", None) + self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) + + def _invalidate_caches_for_room_and_stream( + self, txn: LoggingTransaction, room_id: str + ) -> None: + """Invalidate caches associated with rooms, and stream to replication. + + Used when we delete rooms. + """ + + self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id]) + txn.call_after(self._invalidate_caches_for_room, room_id) + + def _invalidate_caches_for_room(self, room_id: str) -> None: + """Invalidate caches associated with rooms. + + Used when we delete rooms. + """ + + # If we've deleted the room then we also need to purge all event caches. + self._invalidate_caches_for_room_events(room_id) + + self._attempt_to_invalidate_cache("get_account_data_for_room", None) + self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None) + self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,)) + self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) + self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None) + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) + self._attempt_to_invalidate_cache( + "_get_linearized_receipts_for_room", (room_id,) + ) + self._attempt_to_invalidate_cache("is_room_blocked", (room_id,)) + self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,)) + self._attempt_to_invalidate_cache( + "_get_partial_state_servers_at_join", (room_id,) + ) + self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,)) + self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) + self._attempt_to_invalidate_cache( + "get_current_hosts_in_room_ordered", (room_id,) + ) + self._attempt_to_invalidate_cache("did_forget", None) + self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) + self._attempt_to_invalidate_cache("_get_membership_from_event_id", None) + self._attempt_to_invalidate_cache("get_room_version_id", (room_id,)) + + # And delete state caches. + + self._invalidate_state_caches_all(room_id) + async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: @@ -377,6 +503,14 @@ def _send_invalidation_to_replication( "Can't stream invalidate all with magic current state cache" ) + if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None: + raise Exception( + "Can't stream invalidate all with magic purge history cache" + ) + + if cache_name == DELETE_ROOM_CACHE_NAME and keys is None: + raise Exception("Can't stream invalidate all with magic delete room cache") + if isinstance(self.database_engine, PostgresEngine): assert self._cache_id_gen is not None diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a39bc9097439..d93ffc4efa75 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -903,6 +903,15 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None: self._event_ref.pop(event_id, None) self._current_event_fetches.pop(event_id, None) + def _invalidate_local_get_event_cache_all(self) -> None: + """Clears the in-memory get event caches. + + Used when we purge room history. + """ + self._get_event_cache.clear() + self._event_ref.clear() + self._current_event_fetches.clear() + async def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index efbd3e75d99e..9773c1fcd28a 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -308,6 +308,8 @@ def _purge_history_txn( logger.info("[purge] done") + self._invalidate_caches_for_room_events_and_stream(txn, room_id) + return referenced_state_groups async def purge_room(self, room_id: str) -> List[int]: @@ -485,10 +487,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: # index on them. In any case we should be clearing out 'stream' tables # periodically anyway (#5888) - # TODO: we could probably usefully do a bunch more cache invalidation here - - # XXX: as with purge_history, this is racy, but no worse than other races - # that already exist. - self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,)) + self._invalidate_caches_for_room_and_stream(txn, room_id) return state_groups diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index ed0da17227d3..6137c85e1051 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -862,5 +862,5 @@ def invalidate_local(self, key: KT) -> None: async def contains(self, key: KT) -> bool: return self._lru_cache.contains(key) - async def clear(self) -> None: + def clear(self) -> None: self._lru_cache.clear() diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 0d9a3de92a5d..9f035a02dc69 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -163,7 +163,7 @@ def test_unknown_room_version(self) -> None: # Blow away caches (supported room versions can only change due to a restart). self.store.get_rooms_for_user_with_stream_ordering.invalidate_all() self.store.get_rooms_for_user.invalidate_all() - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() self.store._event_ref.clear() # The rooms should be excluded from the sync response. diff --git a/tests/rest/client/test_read_marker.py b/tests/rest/client/test_read_marker.py index 0eedcdb476b4..5cdd5694a04a 100644 --- a/tests/rest/client/test_read_marker.py +++ b/tests/rest/client/test_read_marker.py @@ -131,9 +131,6 @@ def send_message() -> str: event = self.get_success(self.store.get_event(event_id_1, allow_none=True)) assert event is None - # TODO See https://github.com/matrix-org/synapse/issues/13476 - self.store.get_event_ordering.invalidate_all() - # Test moving the read marker to a newer event event_id_2 = send_message() channel = self.make_request( diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 9606ecc43b6b..788500e38f2d 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -188,7 +188,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_id = res["event_id"] # Reset the event cache so the tests start with it empty - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() def test_simple(self) -> None: """Test that we cache events that we pull from the DB.""" @@ -205,7 +205,7 @@ def test_event_ref(self) -> None: """ # Reset the event cache - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() with LoggingContext("test") as ctx: # We keep hold of the event event though we never use it. @@ -215,7 +215,7 @@ def test_event_ref(self) -> None: self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) # Reset the event cache - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() with LoggingContext("test") as ctx: self.get_success(self.store.get_event(self.event_id)) @@ -390,7 +390,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_id = res["event_id"] # Reset the event cache so the tests start with it empty - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() @contextmanager def blocking_get_event_calls(