Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce spurious replication catchup #16555

Merged
merged 5 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16555.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce some spurious logging in worker mode.
14 changes: 9 additions & 5 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,14 @@ async def _process_position(
# Find where we previously streamed up to.
current_token = stream.current_token(cmd.instance_name)

# If the position token matches our current token then we're up to
# date and there's nothing to do. Otherwise, fetch all updates
# between then and now.
missing_updates = cmd.prev_token != current_token
# If the position token matches our current token then we're up to date
# and there's nothing to do. Otherwise, fetch all updates between then
# and now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# If the position token matches our current token then we're up to date
# and there's nothing to do. Otherwise, fetch all updates between then
# and now.
# If the incoming position is at least our current position then we're
# up to date and there's nothing to do. Otherwise, fetch all updates
# between then and now.

Copy link
Contributor

@DMRobertson DMRobertson Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example: I think the stream is at position 10.
I receive POSITION 4 8.
Uh oh, I might have missed stream entries 5, 6, 7 and 8. Need to rescan for them.

(Here prev < next < current.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no. POSITION 4 8 means that the new token for that instance should be set to 8, and nothing has happened between 4 and 8 from that instance. (The reason for this is that normally we'd just get a stream of RDATA from the instance, which implicitly bumps the token. If for some reason the token has advanced without any updates to send, it sends a POSITION, hence why it gives the range where things have not happened).

Copy link
Member Author

@erikjohnston erikjohnston Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# If the position token matches our current token then we're up to date
# and there's nothing to do. Otherwise, fetch all updates between then
# and now.
# If the incoming previous position is less than our current position
# then we're up to date and there's nothing to do. Otherwise, fetch
# all updates between then and now.

Maybe?

#
# Note: We have to check that `current_token` is within the range, to
# handle the case where the stream gets "reset" (e.g. for `caches` and
# `typing` after the writer's restart).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Note: We have to check that `current_token` is within the range, to
# handle the case where the stream gets "reset" (e.g. for `caches` and
# `typing` after the writer's restart).
# Note: We also have to check that `current_token` is at least the
# previous position, to handle the case where the stream gets "reset"
# (e.g. for `caches` and `typing` after the writer's restart).

Copy link
Contributor

@DMRobertson DMRobertson Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example: I think the stream is at position 10.
I receive POSITION 12 15.
Uh oh, I might have missed stream entries 11, 12, 13, 14, and 15. Need to rescan for them.

(Here current < prev < next.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh oh, I might have missed stream entries 11, 12, 13, 14, and 15. Need to rescan for them.

You might have missed entries 11 and 12.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does POSITION need some better documentation somewhere (either https://matrix-org.github.io/synapse/develop/development/synapse_architecture/streams.html or maybe in the code?) describing how it works? I think I was also under the impression that it told you were it had updates, not where it jumped to without updates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not blocking this PR, but would appreciate a follow-up or at least an issue.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#16560 since its a Friday afternoon and I'm liable to forget to follow up otherwise

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Note: We have to check that `current_token` is within the range, to
# handle the case where the stream gets "reset" (e.g. for `caches` and
# `typing` after the writer's restart).
# Note: We also have to check that `current_token` is at least the
# new position, to handle the case where the stream gets "reset"
# (e.g. for `caches` and `typing` after the writer's restart).

missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity check: are the inequalities correct here? E.g. should one of them be < instead of <=?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point that it looks weird. But I think they're right as we're comparing "upper bounds"

while missing_updates:
# Note: There may very well not be any new updates, but we check to
# make sure. This can particularly happen for the event stream where
Expand Down Expand Up @@ -644,7 +648,7 @@ async def _process_position(
[stream.parse_row(row) for row in rows],
)

logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
logger.info("Caught up with stream '%s' to %i", stream_name, current_token)

# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
Expand Down
Loading