Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix sending out of order POSITION over replication #16639

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16639.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix sending out of order `POSITION` over replication, causing additional database load.
38 changes: 19 additions & 19 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ def __init__(self, hs: "HomeServer"):
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)

# Marks if we should send POSITION commands for all streams ASAP. This
# is checked by the `ReplicationStreamer` which manages sending
# RDATA/POSITION commands
self._should_announce_positions = True

def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
Expand Down Expand Up @@ -397,29 +402,23 @@ def get_streams_to_replicate(self) -> List[Stream]:
return self._streams_to_replicate

def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
self.send_positions_to_connection(conn)
self.send_positions_to_connection()

def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
def send_positions_to_connection(self) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""

# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
# Note that we use the current token as the prev token here (rather
# than stream.last_token), as we can't be sure that there have been
# no rows written between last token and the current token (since we
# might be racing with the replication sending bg process).
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
current_token,
current_token,
)
)
self._should_announce_positions = True
self._notifier.notify_replication()

def should_announce_positions(self) -> bool:
"""Check if we should send POSITION commands for all streams ASAP."""
return self._should_announce_positions
clokep marked this conversation as resolved.
Show resolved Hide resolved

def will_announce_positions(self) -> None:
"""Mark that we're about to send POSITIONs out for all streams."""
self._should_announce_positions = False

def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
Expand Down Expand Up @@ -626,8 +625,9 @@ async def _process_position(
# for why this can happen.

logger.info(
"Fetching replication rows for '%s' between %i and %i",
"Fetching replication rows for '%s' / %s between %i and %i",
stream_name,
cmd.instance_name,
current_token,
cmd.new_token,
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def _send_subscribe(self) -> None:
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
self.synapse_handler.send_positions_to_connection(self)
self.synapse_handler.send_positions_to_connection()

def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis."""
Expand Down
17 changes: 16 additions & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def on_notifier_poke(self) -> None:

# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
if all(
if not self.command_handler.should_announce_positions() and all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
Expand Down Expand Up @@ -158,6 +158,21 @@ async def _run_notifier_loop(self) -> None:
all_streams = list(all_streams)
random.shuffle(all_streams)

if self.command_handler.should_announce_positions():
# We need to send out POSITIONs for all streams, usually
# because a worker has reconnected.
self.command_handler.will_announce_positions()

for stream in all_streams:
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.last_token,
stream.last_token,
)
)

for stream in all_streams:
if stream.last_token == stream.current_token(
self._instance_name
Expand Down
8 changes: 8 additions & 0 deletions tests/replication/tcp/streams/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def test_typing(self) -> None:
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)

# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

self.reconnect()

typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
Expand Down Expand Up @@ -91,6 +95,10 @@ def test_reset(self) -> None:
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)

# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

self.reconnect()

typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
Expand Down
Loading