Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7584 from matrix-org/erikj/save_and_send_fed_toke…
Browse files Browse the repository at this point in the history
…n_in_bg

Speed up processing of federation stream RDATA rows.
  • Loading branch information
erikjohnston authored May 27, 2020
2 parents c4a820b + ef3934e commit 8c5f88f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/7584.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up processing of federation stream RDATA rows.
31 changes: 24 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,24 @@ async def update_token(self, token):
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
try:
self.federation_position = token
self.federation_position = token

# We save and send the ACK to master asynchronously, so we don't block
# processing on persistence. We don't need to do this operation for
# every single RDATA we receive, we just need to do it periodically.

if self._fed_position_linearizer.is_queued(None):
# There is already a task queued up to save and send the token, so
# no need to queue up another task.
return

run_as_background_process("_save_and_send_ack", self._save_and_send_ack)

async def _save_and_send_ack(self):
"""Save the current federation position in the database and send an ACK
to master with where we're up to.
"""
try:
# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
Expand All @@ -875,16 +890,18 @@ async def update_token(self, token):
# we're not being re-entered?

with (await self._fed_position_linearizer.queue(None)):
# We persist and ack the same position, so we take a copy of it
# here as otherwise it can get modified from underneath us.
current_position = self.federation_position

await self.store.update_federation_out_pos(
"federation", self.federation_position
"federation", current_position
)

# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
self._hs.get_tcp_replication().send_federation_ack(current_position)
self._last_ack = current_position
except Exception:
logger.exception("Error updating federation stream position")

Expand Down
12 changes: 12 additions & 0 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,18 @@ def __init__(self, name=None, max_count=1, clock=None):
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]

def is_queued(self, key) -> bool:
"""Checks whether there is a process queued up waiting
"""
entry = self.key_to_defer.get(key)
if not entry:
# No entry so nothing is waiting.
return False

# There are waiting deferreds only in the OrderedDict of deferreds is
# non-empty.
return bool(entry[1])

def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
Expand Down
32 changes: 32 additions & 0 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ def test_linearizer(self):
with (yield d2):
pass

@defer.inlineCallbacks
def test_linearizer_is_queued(self):
linearizer = Linearizer()

key = object()

d1 = linearizer.queue(key)
cm1 = yield d1

# Since d1 gets called immediately, "is_queued" should return false.
self.assertFalse(linearizer.is_queued(key))

d2 = linearizer.queue(key)
self.assertFalse(d2.called)

# Now d2 is queued up behind successful completion of cm1
self.assertTrue(linearizer.is_queued(key))

with cm1:
self.assertFalse(d2.called)

# cm1 still not done, so d2 still queued.
self.assertTrue(linearizer.is_queued(key))

# And now d2 is called and nothing is in the queue again
self.assertFalse(linearizer.is_queued(key))

with (yield d2):
self.assertFalse(linearizer.is_queued(key))

self.assertFalse(linearizer.is_queued(key))

def test_lots_of_queued_things(self):
# we have one slow thing, and lots of fast things queued up behind it.
# it should *not* explode the stack.
Expand Down

0 comments on commit 8c5f88f

Please sign in to comment.