From 166848ad0a510b8bbe7e1b38e5a9c13d03bece1b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Oct 2023 15:15:31 +0100 Subject: [PATCH 1/2] Reduce amount of caches POSITIONS we send --- synapse/replication/tcp/streams/_base.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 5c4d228f3d7c..58a44029aa75 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -161,6 +161,14 @@ async def get_updates(self) -> StreamUpdateResult: and `limited` is whether there are more updates to fetch. """ current_token = self.current_token(self.local_instance_name) + + # If the minimum current token for the local instance is less than or + # equal to the last thing we published, we know that there are no + # updates. + if self.last_token >= self.minimal_local_current_token(): + self.last_token = current_token + return [], current_token, False + updates, current_token, limited = await self.get_updates_since( self.local_instance_name, self.last_token, current_token ) @@ -489,6 +497,8 @@ def current_token(self, instance_name: str) -> Token: return self.store.get_cache_stream_token_for_writer(instance_name) def minimal_local_current_token(self) -> Token: + if self.store._cache_id_gen: + return self.store._cache_id_gen.get_minimal_local_current_token() return self.current_token(self.local_instance_name) From d4a7aa4e89e1237f2d01e8b118423638a0ea6b3d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Oct 2023 15:16:27 +0100 Subject: [PATCH 2/2] Newsfile --- changelog.d/16561.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16561.bugfix diff --git a/changelog.d/16561.bugfix b/changelog.d/16561.bugfix new file mode 100644 index 000000000000..4f4a0380cd77 --- /dev/null +++ b/changelog.d/16561.bugfix @@ -0,0 +1 @@ +Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.