Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix sending out POSITIONs when our token advances without update
Browse files Browse the repository at this point in the history
Broke in #14820
  • Loading branch information
erikjohnston committed Oct 13, 2023
1 parent aeb31af commit 9301dcb
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
9 changes: 8 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,17 @@ def get_current_token_for_writer(self, instance_name: str) -> int:
# persisted up to position. This stops Synapse from doing a full table
# scan when a new writer announces itself over replication.
with self._lock:
return self._return_factor * self._current_positions.get(
pos = self._current_positions.get(
instance_name, self._persisted_upto_position
)

# We also ensure that we always return at least the
# `persisted_upto_position` for ourselves, so that when we notify
# other workers about our position we give them the max valid value
# here so that nothing waits for us to advance.
pos = max(pos, self._persisted_upto_position)
return self._return_factor * pos

def get_positions(self) -> Dict[str, int]:
"""Get a copy of the current positon map.
Expand Down
17 changes: 7 additions & 10 deletions tests/storage/test_id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,12 @@ def test_multi_instance(self) -> None:
first_id_gen = self._create_id_generator("first", writers=["first", "second"])
second_id_gen = self._create_id_generator("second", writers=["first", "second"])

# The first ID gen will notice that it can advance its token to 7 as it
# has no in progress writes...
self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7})
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7)

# ... but the second ID gen doesn't know that.
self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7})
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7)

# Try allocating a new ID gen and check that we only see position
Expand Down Expand Up @@ -420,14 +417,14 @@ def test_multi_instance_empty_row(self) -> None:
self.assertEqual(
first_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7}
)
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7)
self.assertEqual(first_id_gen.get_current_token_for_writer("third"), 7)

self.assertEqual(
second_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7}
)
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7)
self.assertEqual(second_id_gen.get_current_token_for_writer("third"), 7)

Expand Down Expand Up @@ -873,11 +870,11 @@ def test_load_existing_stream(self) -> None:
second_id_gen = self._create_id_generator("second", writers=["first", "second"])

self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 6})
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 6)
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7)
self.assertEqual(first_id_gen.get_persisted_upto_position(), 7)

self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7})
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7)
self.assertEqual(second_id_gen.get_persisted_upto_position(), 7)

0 comments on commit 9301dcb

Please sign in to comment.