Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add some clarification to the doc/comments regarding TCP replication #15354

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15354.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some clarification to the doc/comments regarding TCP replication.
9 changes: 5 additions & 4 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
the format of `<row>` is defined by the individual streams. The
`<instance_name>` is the name of the Synapse process that generated the data
(usually "master").
(usually "master"). We expect an RDATA for every row in the DB.

Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
Expand Down Expand Up @@ -107,7 +107,7 @@ reconnect, following the steps above.
If the server sends messages faster than the client can consume them the
server will first buffer a (fairly large) number of commands and then
disconnect the client. This ensures that we don't queue up an unbounded
number of commands in memory and gives us a potential oppurtunity to
number of commands in memory and gives us a potential opportunity to
squawk loudly. When/if the client recovers it can reconnect to the
server and ask for missed messages.

Expand All @@ -122,7 +122,7 @@ since these include tokens which can be used to restart the stream on
connection errors.

The client should keep track of the token in the last RDATA command
received for each stream so that on reconneciton it can start streaming
received for each stream so that on reconnection it can start streaming
from the correct place. Note: not all RDATA have valid tokens due to
batching. See `RdataCommand` for more details.

Expand Down Expand Up @@ -188,7 +188,8 @@ client (C):
Two positions are included, the "new" position and the last position sent respectively.
This allows servers to tell instances that the positions have advanced but no
data has been written, without clients needlessly checking to see if they
have missed any updates.
have missed any updates. Instances will only fetch stuff if there is a gap between
their current position and the given last position.

#### ERROR (S, C)

Expand Down
31 changes: 1 addition & 30 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,7 @@
"""This module contains the implementation of both the client and server
protocols.

The basic structure of the protocol is line based, where the initial word of
each line specifies the command. The rest of the line is parsed based on the
command. For example, the `RDATA` command is defined as::

RDATA <stream_name> <token> <row_json>

(Note that `<row_json>` may contains spaces, but cannot contain newlines.)

Blank lines are ignored.

# Example

An example iteraction is shown below. Each line is prefixed with '>' or '<' to
indicate which side is sending, these are *not* included on the wire::

* connection established *
> SERVER localhost:8823
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
An explanation of this protocol is available in docs/tcp_replication.md
"""
import fcntl
import logging
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ async def get_updates(self) -> StreamUpdateResult:
Returns:
A triplet `(updates, new_last_token, limited)`, where `updates` is
a list of `(token, row)` entries, `new_last_token` is the new
position in stream, and `limited` is whether there are more updates
to fetch.
position in stream (ie the highest token returned in the updates),
and `limited` is whether there are more updates to fetch.
"""
current_token = self.current_token(self.local_instance_name)
updates, current_token, limited = await self.get_updates_since(
Expand Down