Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix tight loop handling presence replication.
Browse files Browse the repository at this point in the history
Only affects workers. Introduced in #9819.

Fixes #9899.
  • Loading branch information
erikjohnston committed Apr 28, 2021
1 parent 8ba0869 commit 468831f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
24 changes: 23 additions & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2026,18 +2026,40 @@ async def get_replication_rows(
)
return result["updates"], result["upto_token"], result["limited"]

# If the from_token is the current token then there's nothing to return
# and we can trivially no-op.
if from_token == self._next_id - 1:
return [], upto_token, False

# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
# Since we are returning all states in the range `from_token < stream_id
# <= upto_token` we look for the index with a `stream_id` of `from_token
# + 1`.
#
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
start_idx = max(from_token - self._next_id, -len(self._queue))
start_idx = max(from_token + 1 - self._next_id, -len(self._queue))

to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
if stream_id <= from_token:
# Paranoia check that we are actually only sending states that
# are have stream_id strictly greater than from_token. We should
# never hit this.
logger.warning(
"Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
stream_id,
from_token,
self._next_id,
len(self._queue),
)
continue

if stream_id > upto_token:
break

Expand Down
22 changes: 22 additions & 0 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,14 @@ def test_send_and_get(self):

self.assertCountEqual(rows, expected_rows)

now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", upto_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
self.assertCountEqual(rows, [])

def test_send_and_get_split(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
Expand Down Expand Up @@ -538,6 +546,20 @@ def test_send_and_get_split(self):

self.assertCountEqual(rows, expected_rows)

now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", upto_token, now_token, 10)
)

self.assertEqual(upto_token, now_token)
self.assertFalse(limited)

expected_rows = [
(2, ("dest3", "@user3:test")),
]

self.assertCountEqual(rows, expected_rows)

def test_clear_queue_all(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
Expand Down

0 comments on commit 468831f

Please sign in to comment.