Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix race in replication #7226

Merged
merged 7 commits into from
Apr 7, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,21 @@ async def on_RDATA(self, cmd: RdataCommand):
# missing RDATA.
with await self._position_linearizer.queue(cmd.stream_name):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried that we could get quite far behind (ie, have a long list of things waiting for the position linearizer) if the catchup is a bit slow and we get a few POSITION lines intermixed with lots of RDATA lines, all of which will end up getting processed in series.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, though I'm not sure the solution to that is allowing RDATA to be processed in parallel. Perhaps we just want to add metrics for the queue size?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah maybe it's not worth worrying about for now. especially if we can mitigate it as per #7226 (comment).

if stream_name not in self._streams_connected:
# If the stream isn't marked as connected then we haven't seen a
# `POSITION` command yet, and so we may have missed some rows.
# Let's drop the row for now, on the assumption we'll receive a
# `POSITION` soon and we'll catch up correctly then.
logger.warning(
"Discarding RDATA for unconnected stream %s", stream_name
"Discarding RDATA for unconnected stream %s -> ",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream_name,
cmd.token,
)
return

if cmd.token is None:
# I.e. either this is part of a batch of updates for this stream (in
# I.e. this is part of a batch of updates for this stream (in
# which case batch until we get an update for the stream with a non
# None token) or we're currently connecting so we queue up rows.
# None token).
self._pending_batches.setdefault(stream_name, []).append(row)
else:
# Check if this is the last of a batch of updates
Expand Down