From c9d62c355d5038e0b4249f4d7c3452a46af89802 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 May 2023 09:42:54 +0100 Subject: [PATCH 01/11] WIP clear event caches when we purge history --- synapse/state/__init__.py | 1 + synapse/storage/databases/main/cache.py | 68 +++++++++++++++++++ .../storage/databases/main/events_worker.py | 9 +++ .../storage/databases/main/purge_events.py | 8 +++ 4 files changed, 86 insertions(+) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 603109524932..94f75253a74c 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -595,6 +595,7 @@ def __init__(self, hs: "HomeServer"): self.resolve_linearizer = Linearizer(name="state_resolve_lock") # dict of set of event_ids -> _StateCacheEntry. + # TODO: Clear this when we purge history? self._state_cache: ExpiringCache[ FrozenSet[int], _StateCacheEntry ] = ExpiringCache( diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 46fa0a73f9e4..04182c690aec 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -226,6 +226,9 @@ def _invalidate_caches_for_event( relates_to: Optional[str], backfilled: bool, ) -> None: + # XXX: If you add something to this function make sure you add it to + # `_invalidate_caches_for_room` as well. + # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external # cached objects. @@ -271,6 +274,71 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) self._attempt_to_invalidate_cache("get_threads", (room_id,)) + def _invalidate_caches_for_room(self, room_id: str) -> None: + """Invalidate caches associated with rooms. + + Used when we delete events in rooms, but don't know which events we've + deleted. + """ + + self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined] + + self._attempt_to_invalidate_cache("have_seen_event", (room_id,)) + self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) + + self._attempt_to_invalidate_cache("_get_membership_from_event_id", None) + self._attempt_to_invalidate_cache("get_relations_for_event", None) + self._attempt_to_invalidate_cache("get_applicable_edit", None) + self._attempt_to_invalidate_cache("get_thread_id", None) + self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None) + self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", None + ) + self._attempt_to_invalidate_cache("get_rooms_for_user", None) + self._attempt_to_invalidate_cache("get_references_for_event", None) + self._attempt_to_invalidate_cache("get_thread_summary", None) + self._attempt_to_invalidate_cache("get_thread_participated", None) + self._attempt_to_invalidate_cache("get_threads", (room_id,)) + + self._attempt_to_invalidate_cache("_get_state_group_for_event", None) + + # List of caches to also invalidate. Lines ending with `room_id` can be + # invalidated by room_id, the rest must be cleared entirely. + + # get_account_data_for_room + # get_account_data_for_room_and_type + # get_aliases_for_room + # get_latest_event_ids_in_room + # _get_forward_extremeties_for_room + # get_unread_event_push_actions_by_room_for_user room_id + # get_event_ordering + # is_partial_state_event + # _get_linearized_receipts_for_room room_id + # is_room_blocked room_id + # get_retention_policy_for_room room_id + # _get_partial_state_servers_at_join room_id + # is_partial_state_room room_id + # get_invited_rooms_for_local_user + # _get_joined_profile_from_event_id + # is_host_invited + # get_current_hosts_in_room_ordered room_id + # did_forget + # get_forgotten_rooms_for_user + # _get_membership_from_event_id + # get_room_version_id room_id + # get_partial_current_state_ids room_id + # + + # ... plus state caches for delete room? c.f. _invalidate_state_caches + # in `synapse.storage._base` for a list of such caches. Maybe we can + # just reuse that function? + + # Plus we should clear the state cache in the state handler. + async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 53aa5933d56f..b9fe8027c0e8 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -903,6 +903,15 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None: self._event_ref.pop(event_id, None) self._current_event_fetches.pop(event_id, None) + def _invalidate_local_get_event_cache_all(self) -> None: + """Clears the in-memory get event caches. + + Used when we purge room history. + """ + self._get_event_cache.clear() + self._event_ref.clear() + self._current_event_fetches.clear() + async def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index efbd3e75d99e..ff657ca7b7b3 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -64,6 +64,9 @@ def _purge_history_txn( token: RoomStreamToken, delete_local_events: bool, ) -> Set[int]: + # TODO: Also stream this + txn.call_after(self._invalidate_caches_for_room, room_id) + # Tables that should be pruned: # event_auth # event_backward_extremities @@ -346,6 +349,11 @@ async def purge_room(self, room_id: str) -> List[int]: return state_groups_to_delete def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: + # TODO: Also stream this + txn.call_after(self._invalidate_caches_for_room, room_id) + + # TODO: Also clear all state caches? + # This collides with event persistence so we cannot write new events and metadata into # a room while deleting it or this transaction will fail. if isinstance(self.database_engine, PostgresEngine): From 139e216b462fdbe4f2cc47d9ecc0ed1c8b894547 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 18 May 2023 14:39:49 -0400 Subject: [PATCH 02/11] Remove TODO from a test which will be fixed in this PR. --- tests/rest/client/test_read_marker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/rest/client/test_read_marker.py b/tests/rest/client/test_read_marker.py index 0eedcdb476b4..5cdd5694a04a 100644 --- a/tests/rest/client/test_read_marker.py +++ b/tests/rest/client/test_read_marker.py @@ -131,9 +131,6 @@ def send_message() -> str: event = self.get_success(self.store.get_event(event_id_1, allow_none=True)) assert event is None - # TODO See https://github.com/matrix-org/synapse/issues/13476 - self.store.get_event_ordering.invalidate_all() - # Test moving the read marker to a newer event event_id_2 = send_message() channel = self.make_request( From d60048df1c8591700a4fb6dc66403212fa73ebf8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Jun 2023 14:15:30 +0100 Subject: [PATCH 03/11] Invalidate more caches --- synapse/storage/databases/main/cache.py | 79 +++++++++++++++---------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 04182c690aec..d9af02c56243 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -306,36 +306,55 @@ def _invalidate_caches_for_room(self, room_id: str) -> None: self._attempt_to_invalidate_cache("_get_state_group_for_event", None) - # List of caches to also invalidate. Lines ending with `room_id` can be - # invalidated by room_id, the rest must be cleared entirely. - - # get_account_data_for_room - # get_account_data_for_room_and_type - # get_aliases_for_room - # get_latest_event_ids_in_room - # _get_forward_extremeties_for_room - # get_unread_event_push_actions_by_room_for_user room_id - # get_event_ordering - # is_partial_state_event - # _get_linearized_receipts_for_room room_id - # is_room_blocked room_id - # get_retention_policy_for_room room_id - # _get_partial_state_servers_at_join room_id - # is_partial_state_room room_id - # get_invited_rooms_for_local_user - # _get_joined_profile_from_event_id - # is_host_invited - # get_current_hosts_in_room_ordered room_id - # did_forget - # get_forgotten_rooms_for_user - # _get_membership_from_event_id - # get_room_version_id room_id - # get_partial_current_state_ids room_id - # - - # ... plus state caches for delete room? c.f. _invalidate_state_caches - # in `synapse.storage._base` for a list of such caches. Maybe we can - # just reuse that function? + # Also invalidate room based caches + + self._attempt_to_invalidate_cache("get_account_data_for_room", None) + self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None) + self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,)) + self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) + self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None) + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) + self._attempt_to_invalidate_cache("get_event_ordering", None) + self._attempt_to_invalidate_cache("is_partial_state_event", None) + self._attempt_to_invalidate_cache( + "_get_linearized_receipts_for_room", (room_id,) + ) + self._attempt_to_invalidate_cache("is_room_blocked", (room_id,)) + self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,)) + self._attempt_to_invalidate_cache( + "_get_partial_state_servers_at_join", (room_id,) + ) + self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,)) + self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) + self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) + self._attempt_to_invalidate_cache("is_host_invited", None) + self._attempt_to_invalidate_cache("is_host_joined", None) + self._attempt_to_invalidate_cache( + "get_current_hosts_in_room_ordered", (room_id,) + ) + self._attempt_to_invalidate_cache("did_forget", None) + self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) + self._attempt_to_invalidate_cache("_get_membership_from_event_id", None) + self._attempt_to_invalidate_cache("get_room_version_id", (room_id,)) + + # And delete state caches. + + self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) + self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) + self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", None + ) + self._attempt_to_invalidate_cache("get_rooms_for_user", None) + self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) + self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) # Plus we should clear the state cache in the state handler. From 52143790d760f86e543b9a8decfd2da96df9b5a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Jun 2023 17:49:58 +0100 Subject: [PATCH 04/11] Make .clear sync --- synapse/util/caches/lrucache.py | 2 +- tests/handlers/test_sync.py | 2 +- tests/storage/databases/main/test_events_worker.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 452d5d04c1c0..b514ffe4e73b 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -864,5 +864,5 @@ def invalidate_local(self, key: KT) -> None: async def contains(self, key: KT) -> bool: return self._lru_cache.contains(key) - async def clear(self) -> None: + def clear(self) -> None: self._lru_cache.clear() diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 0d9a3de92a5d..9f035a02dc69 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -163,7 +163,7 @@ def test_unknown_room_version(self) -> None: # Blow away caches (supported room versions can only change due to a restart). self.store.get_rooms_for_user_with_stream_ordering.invalidate_all() self.store.get_rooms_for_user.invalidate_all() - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() self.store._event_ref.clear() # The rooms should be excluded from the sync response. diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 9606ecc43b6b..788500e38f2d 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -188,7 +188,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_id = res["event_id"] # Reset the event cache so the tests start with it empty - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() def test_simple(self) -> None: """Test that we cache events that we pull from the DB.""" @@ -205,7 +205,7 @@ def test_event_ref(self) -> None: """ # Reset the event cache - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() with LoggingContext("test") as ctx: # We keep hold of the event event though we never use it. @@ -215,7 +215,7 @@ def test_event_ref(self) -> None: self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) # Reset the event cache - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() with LoggingContext("test") as ctx: self.get_success(self.store.get_event(self.event_id)) @@ -390,7 +390,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_id = res["event_id"] # Reset the event cache so the tests start with it empty - self.get_success(self.store._get_event_cache.clear()) + self.store._get_event_cache.clear() @contextmanager def blocking_get_event_calls( From d3a0464256faad0f2dd73282cfa61f2777e73c33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Jun 2023 17:51:52 +0100 Subject: [PATCH 05/11] Newsfile --- changelog.d/15609.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15609.bugfix diff --git a/changelog.d/15609.bugfix b/changelog.d/15609.bugfix new file mode 100644 index 000000000000..b5a990cfec1e --- /dev/null +++ b/changelog.d/15609.bugfix @@ -0,0 +1 @@ +Correctly clear caches when we delete a room. From 4e91fbe1ccdff4d6e3b197f4171a97ee23adb65c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Jun 2023 10:44:56 +0100 Subject: [PATCH 06/11] Stream cache invalidation --- synapse/storage/databases/main/cache.py | 26 +++++++++++++++++++ .../storage/databases/main/purge_events.py | 6 ++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index d9af02c56243..0fdd4c786d2e 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -46,6 +46,9 @@ # based on the current state when notifying workers over replication. CURRENT_STATE_CACHE_NAME = "cs_cache_fake" +# As above, but for invalidating +DELETE_ROOM_CACHE_NAME = "dr_cache_fake" + class CacheInvalidationWorkerStore(SQLBaseStore): def __init__( @@ -175,6 +178,14 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + elif row.cache_func == DELETE_ROOM_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for 'delete room' cache" + ) + + room_id = row.keys[0] + self._invalidate_caches_for_room(room_id) else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -274,6 +285,18 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) self._attempt_to_invalidate_cache("get_threads", (room_id,)) + def _invalidate_caches_for_room_and_stream( + self, txn: LoggingTransaction, room_id: str + ) -> None: + """Invalidate caches associated with rooms, and stream to replication. + + Used when we delete events in rooms, but don't know which events we've + deleted. + """ + + self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id]) + txn.call_after(self._invalidate_caches_for_room, room_id) + def _invalidate_caches_for_room(self, room_id: str) -> None: """Invalidate caches associated with rooms. @@ -464,6 +487,9 @@ def _send_invalidation_to_replication( "Can't stream invalidate all with magic current state cache" ) + if cache_name == DELETE_ROOM_CACHE_NAME and keys is None: + raise Exception("Can't stream invalidate all with magic delete room cache") + if isinstance(self.database_engine, PostgresEngine): assert self._cache_id_gen is not None diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ff657ca7b7b3..c7a6477e3208 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -64,8 +64,7 @@ def _purge_history_txn( token: RoomStreamToken, delete_local_events: bool, ) -> Set[int]: - # TODO: Also stream this - txn.call_after(self._invalidate_caches_for_room, room_id) + self._invalidate_caches_for_room_and_stream(txn, room_id) # Tables that should be pruned: # event_auth @@ -349,8 +348,7 @@ async def purge_room(self, room_id: str) -> List[int]: return state_groups_to_delete def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: - # TODO: Also stream this - txn.call_after(self._invalidate_caches_for_room, room_id) + self._invalidate_caches_for_room_and_stream(txn, room_id) # TODO: Also clear all state caches? From 166dd75a288b2528e17e9541893b2331e594cbda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Jun 2023 10:59:22 +0100 Subject: [PATCH 07/11] Move cache invalidate to the end --- synapse/storage/databases/main/purge_events.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index c7a6477e3208..9420efcc35a6 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -64,8 +64,6 @@ def _purge_history_txn( token: RoomStreamToken, delete_local_events: bool, ) -> Set[int]: - self._invalidate_caches_for_room_and_stream(txn, room_id) - # Tables that should be pruned: # event_auth # event_backward_extremities @@ -310,6 +308,8 @@ def _purge_history_txn( logger.info("[purge] done") + self._invalidate_caches_for_room_and_stream(txn, room_id) + return referenced_state_groups async def purge_room(self, room_id: str) -> List[int]: @@ -348,10 +348,6 @@ async def purge_room(self, room_id: str) -> List[int]: return state_groups_to_delete def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: - self._invalidate_caches_for_room_and_stream(txn, room_id) - - # TODO: Also clear all state caches? - # This collides with event persistence so we cannot write new events and metadata into # a room while deleting it or this transaction will fail. if isinstance(self.database_engine, PostgresEngine): @@ -491,10 +487,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: # index on them. In any case we should be clearing out 'stream' tables # periodically anyway (#5888) - # TODO: we could probably usefully do a bunch more cache invalidation here - - # XXX: as with purge_history, this is racy, but no worse than other races - # that already exist. - self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,)) + self._invalidate_caches_for_room_and_stream(txn, room_id) return state_groups From a6673f8bd4ae1a453cb48cfa84de6395e73eea26 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jun 2023 09:13:44 +0100 Subject: [PATCH 08/11] Remove comment --- synapse/state/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 94f75253a74c..603109524932 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -595,7 +595,6 @@ def __init__(self, hs: "HomeServer"): self.resolve_linearizer = Linearizer(name="state_resolve_lock") # dict of set of event_ids -> _StateCacheEntry. - # TODO: Clear this when we purge history? self._state_cache: ExpiringCache[ FrozenSet[int], _StateCacheEntry ] = ExpiringCache( From 35fe290e6fc6f543fde3c2d3f79d6dc6717afba7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jun 2023 09:53:27 +0100 Subject: [PATCH 09/11] Refactor --- synapse/storage/_base.py | 31 ++++++++ synapse/storage/databases/main/cache.py | 74 +++++++++++-------- .../storage/databases/main/purge_events.py | 2 +- 3 files changed, 77 insertions(+), 30 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 481fec72fe1b..fe4a76341137 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,9 +86,14 @@ def _invalidate_state_caches( room_id: Room where state changed members_changed: The user_ids of members that have changed """ + + # XXX: If you add something to this function make sure you add it to + # `_invalidate_state_caches_all` as well. + # If there were any membership changes, purge the appropriate caches. for host in {get_domain_from_id(u) for u in members_changed}: self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) + self._attempt_to_invalidate_cache("is_host_invited", (room_id, host)) if members_changed: self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) @@ -117,6 +122,32 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) + def _invalidate_state_caches_all(self, room_id: str) -> None: + """Invalidates caches that are based on the current state, but does + not stream invalidations down replication. + + Same as `_invalidate_state_caches`, except that works when we don't know + which memberships have changed. + + Args: + room_id: Room where state changed + """ + self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("is_host_invited", None) + self._attempt_to_invalidate_cache("is_host_joined", None) + self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) + self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) + self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", None + ) + self._attempt_to_invalidate_cache("get_rooms_for_user", None) + self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) + def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] ) -> bool: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 0fdd4c786d2e..551c9c9094ee 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -46,7 +46,10 @@ # based on the current state when notifying workers over replication. CURRENT_STATE_CACHE_NAME = "cs_cache_fake" -# As above, but for invalidating +# As above, but for invalidating event caches on history deletion +PURGE_HISTORY_CACHE_NAME = "ph_cache_fake" + +# As above, but for invalidating room caches on room deletion DELETE_ROOM_CACHE_NAME = "dr_cache_fake" @@ -178,6 +181,14 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + elif row.cache_func == PURGE_HISTORY_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for 'purge history' cache" + ) + + room_id = row.keys[0] + self._invalidate_caches_for_events(room_id) elif row.cache_func == DELETE_ROOM_CACHE_NAME: if row.keys is None: raise Exception( @@ -185,6 +196,7 @@ def process_replication_rows( ) room_id = row.keys[0] + self._invalidate_caches_for_events(room_id) self._invalidate_caches_for_room(room_id) else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -285,20 +297,22 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) self._attempt_to_invalidate_cache("get_threads", (room_id,)) - def _invalidate_caches_for_room_and_stream( + def _invalidate_caches_for_events_and_stream( self, txn: LoggingTransaction, room_id: str ) -> None: - """Invalidate caches associated with rooms, and stream to replication. + """Invalidate caches associated with events in rooms, and stream to + replication. Used when we delete events in rooms, but don't know which events we've deleted. """ - self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id]) - txn.call_after(self._invalidate_caches_for_room, room_id) + self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id]) + txn.call_after(self._invalidate_caches_for_events, room_id) - def _invalidate_caches_for_room(self, room_id: str) -> None: - """Invalidate caches associated with rooms. + def _invalidate_caches_for_events(self, room_id: str) -> None: + """Invalidate caches associated with events in rooms, and stream to + replication. Used when we delete events in rooms, but don't know which events we've deleted. @@ -329,7 +343,29 @@ def _invalidate_caches_for_room(self, room_id: str) -> None: self._attempt_to_invalidate_cache("_get_state_group_for_event", None) - # Also invalidate room based caches + self._attempt_to_invalidate_cache("get_event_ordering", None) + self._attempt_to_invalidate_cache("is_partial_state_event", None) + self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) + + def _invalidate_caches_for_room_and_stream( + self, txn: LoggingTransaction, room_id: str + ) -> None: + """Invalidate caches associated with rooms, and stream to replication. + + Used when we delete rooms. + """ + + self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id]) + txn.call_after(self._invalidate_caches_for_room, room_id) + + def _invalidate_caches_for_room(self, room_id: str) -> None: + """Invalidate caches associated with rooms. + + Used when we delete rooms. + """ + + # If we've deleted the room then we also need to purge all event caches. + self._invalidate_caches_for_events(room_id) self._attempt_to_invalidate_cache("get_account_data_for_room", None) self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None) @@ -339,8 +375,6 @@ def _invalidate_caches_for_room(self, room_id: str) -> None: self._attempt_to_invalidate_cache( "get_unread_event_push_actions_by_room_for_user", (room_id,) ) - self._attempt_to_invalidate_cache("get_event_ordering", None) - self._attempt_to_invalidate_cache("is_partial_state_event", None) self._attempt_to_invalidate_cache( "_get_linearized_receipts_for_room", (room_id,) ) @@ -351,9 +385,6 @@ def _invalidate_caches_for_room(self, room_id: str) -> None: ) self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,)) self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None) - self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) - self._attempt_to_invalidate_cache("is_host_invited", None) - self._attempt_to_invalidate_cache("is_host_joined", None) self._attempt_to_invalidate_cache( "get_current_hosts_in_room_ordered", (room_id,) ) @@ -364,22 +395,7 @@ def _invalidate_caches_for_room(self, room_id: str) -> None: # And delete state caches. - self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) - self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) - self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) - self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) - self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) - self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) - self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) - self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", None - ) - self._attempt_to_invalidate_cache("get_rooms_for_user", None) - self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) - self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) - - # Plus we should clear the state cache in the state handler. + self._invalidate_state_caches_all(room_id) async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 9420efcc35a6..aef317d40d57 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -308,7 +308,7 @@ def _purge_history_txn( logger.info("[purge] done") - self._invalidate_caches_for_room_and_stream(txn, room_id) + self._invalidate_caches_for_events_and_stream(txn, room_id) return referenced_state_groups From 765b18628c803feb858bac368ba88f2843d607e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jun 2023 09:55:37 +0100 Subject: [PATCH 10/11] fixup --- synapse/storage/databases/main/cache.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 551c9c9094ee..4f48a3915445 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -503,6 +503,11 @@ def _send_invalidation_to_replication( "Can't stream invalidate all with magic current state cache" ) + if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None: + raise Exception( + "Can't stream invalidate all with magic purge history cache" + ) + if cache_name == DELETE_ROOM_CACHE_NAME and keys is None: raise Exception("Can't stream invalidate all with magic delete room cache") From e5c88c4c250e131bb53181e16250a860fdd2fff9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Jun 2023 11:34:37 +0100 Subject: [PATCH 11/11] Review comments --- synapse/storage/databases/main/cache.py | 22 +++++++++---------- .../storage/databases/main/purge_events.py | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 4f48a3915445..6e1c7d681fe5 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -188,7 +188,7 @@ def process_replication_rows( ) room_id = row.keys[0] - self._invalidate_caches_for_events(room_id) + self._invalidate_caches_for_room_events(room_id) elif row.cache_func == DELETE_ROOM_CACHE_NAME: if row.keys is None: raise Exception( @@ -196,7 +196,7 @@ def process_replication_rows( ) room_id = row.keys[0] - self._invalidate_caches_for_events(room_id) + self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -250,7 +250,7 @@ def _invalidate_caches_for_event( backfilled: bool, ) -> None: # XXX: If you add something to this function make sure you add it to - # `_invalidate_caches_for_room` as well. + # `_invalidate_caches_for_room_events` as well. # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external @@ -297,24 +297,24 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) self._attempt_to_invalidate_cache("get_threads", (room_id,)) - def _invalidate_caches_for_events_and_stream( + def _invalidate_caches_for_room_events_and_stream( self, txn: LoggingTransaction, room_id: str ) -> None: - """Invalidate caches associated with events in rooms, and stream to + """Invalidate caches associated with events in a room, and stream to replication. - Used when we delete events in rooms, but don't know which events we've + Used when we delete events a room, but don't know which events we've deleted. """ self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id]) - txn.call_after(self._invalidate_caches_for_events, room_id) + txn.call_after(self._invalidate_caches_for_room_events, room_id) - def _invalidate_caches_for_events(self, room_id: str) -> None: - """Invalidate caches associated with events in rooms, and stream to + def _invalidate_caches_for_room_events(self, room_id: str) -> None: + """Invalidate caches associated with events in a room, and stream to replication. - Used when we delete events in rooms, but don't know which events we've + Used when we delete events in a room, but don't know which events we've deleted. """ @@ -365,7 +365,7 @@ def _invalidate_caches_for_room(self, room_id: str) -> None: """ # If we've deleted the room then we also need to purge all event caches. - self._invalidate_caches_for_events(room_id) + self._invalidate_caches_for_room_events(room_id) self._attempt_to_invalidate_cache("get_account_data_for_room", None) self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index aef317d40d57..9773c1fcd28a 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -308,7 +308,7 @@ def _purge_history_txn( logger.info("[purge] done") - self._invalidate_caches_for_events_and_stream(txn, room_id) + self._invalidate_caches_for_room_events_and_stream(txn, room_id) return referenced_state_groups