Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up processing of federation stream RDATA rows. #7584

Merged
merged 5 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7584.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up processing of federation stream RDATA rows.
31 changes: 24 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,24 @@ async def update_token(self, token):
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
try:
self.federation_position = token
self.federation_position = token

# We save and send the ACK to master asynchronously, so we don't block
# processing on persistence. We don't need to do this operation for
# every single RDATA we receive, we just need to do it periodically.

if self._fed_position_linearizer.is_queued(None):
# There is already a task queued up to save and send the token, so
# no need to queue up another task.
return

run_as_background_process("_save_and_send_ack", self._save_and_send_ack)

async def _save_and_send_ack(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Save the current federation position in the database and send an ACK
to master with where we're up to.
"""
try:
# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
Expand All @@ -875,16 +890,18 @@ async def update_token(self, token):
# we're not being re-entered?

with (await self._fed_position_linearizer.queue(None)):
# We persist and ack the same position, so we take a copy of it
# here as otherwise it can get modified from underneath us.
current_position = self.federation_position

await self.store.update_federation_out_pos(
"federation", self.federation_position
"federation", current_position
)

# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
self._hs.get_tcp_replication().send_federation_ack(current_position)
self._last_ack = current_position
except Exception:
logger.exception("Error updating federation stream position")

Expand Down
12 changes: 12 additions & 0 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,18 @@ def __init__(self, name=None, max_count=1, clock=None):
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]

def is_queued(self, key) -> bool:
"""Checks whether there is a process queued up waiting
"""
entry = self.key_to_defer.get(key)
if not entry:
# No entry so nothing is waiting.
return False

# There are waiting deferreds only in the OrderedDict of deferreds is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# There are waiting deferreds only in the OrderedDict of deferreds is
# There are waiting deferreds only if the OrderedDict of deferreds is

I don't think entry is an OrderedDict, isn't it a Sequence? I'm actually a little confused why this would check entry[1].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
# the second element is an OrderedDict, where the keys are deferreds for the
# things blocked from executing.
self.key_to_defer = (
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
explains it. The sequence is basically a data structure of "number of stuff current in flight" and "deferreds that are waiting".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. entry[1] is getting the OrderedDict of deferreds waiting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I only read half the comment. 🙄 Should this really be a Tuple[int, Dict[defer.Deferred, int]] I wonder?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this a bit...and it can't be a tuple cause it gets modified. Anyway, the type checking here is funky, but it is essentially a list of two items.

# non-empty.
return bool(entry[1])

def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
Expand Down
32 changes: 32 additions & 0 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ def test_linearizer(self):
with (yield d2):
pass

@defer.inlineCallbacks
def test_linearizer_is_queued(self):
linearizer = Linearizer()

key = object()

d1 = linearizer.queue(key)
cm1 = yield d1

# Since d1 gets called immediately, "is_queued" should return false.
self.assertFalse(linearizer.is_queued(key))

d2 = linearizer.queue(key)
self.assertFalse(d2.called)

# Now d2 is queued up behind successful completion of cm1
self.assertTrue(linearizer.is_queued(key))

with cm1:
self.assertFalse(d2.called)

# cm1 still not done, so d2 still queued.
self.assertTrue(linearizer.is_queued(key))

# And now d2 is called and nothing is in the queue again
self.assertFalse(linearizer.is_queued(key))

with (yield d2):
self.assertFalse(linearizer.is_queued(key))

self.assertFalse(linearizer.is_queued(key))

def test_lots_of_queued_things(self):
# we have one slow thing, and lots of fast things queued up behind it.
# it should *not* explode the stack.
Expand Down