Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Advance replication streams even if nothing is listening #2098

Merged
merged 1 commit into from
Apr 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,16 +414,18 @@ def subscribe_to_stream(self, stream_name, token):
token, row = update[0], update[1]
self.send_command(RdataCommand(stream_name, token, row))

# Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, [])
for token, update in pending_rdata:
self.send_command(RdataCommand(stream_name, token, update))

# We send a POSITION command to ensure that they have an up to
# date token (especially useful if we didn't send any updates
# above)
self.send_command(PositionCommand(stream_name, current_token))

# Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, [])
for token, update in pending_rdata:
# Only send updates newer than the current token
if token > current_token:
self.send_command(RdataCommand(stream_name, token, update))

# They're now fully subscribed
self.replication_streams.add(stream_name)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def on_notifier_poke(self):
# Don't bother if nothing is listening. We still need to advance
# the stream tokens otherwise they'll fall beihind forever
for stream in self.streams:
stream.advance_current_token()
stream.discard_updates_and_advance()
return

# If we're in the process of checking for new updates, mark that fact
Expand Down
7 changes: 7 additions & 0 deletions synapse/replication/tcp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ def advance_current_token(self):
"""
self.upto_token = self.current_token()

def discard_updates_and_advance(self):
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
"""
self.upto_token = self.current_token()
self.last_token = self.upto_token

@defer.inlineCallbacks
def get_updates(self):
"""Gets all updates since the last time this function was called (or
Expand Down