Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make federation catchup send last event from any server. (#9640)
Browse files Browse the repository at this point in the history
Currently federation catchup will send the last *local* event that we
failed to send to the remote. This can cause issues for large rooms
where lots of servers have sent events while the remote server was down,
as when it comes back up again it'll be flooded with events from various
points in the DAG.

Instead, let's make it so that all the servers send the most recent
events, even if its not theirs. The remote should deduplicate the
events, so there shouldn't be much overhead in doing this.
Alternatively, the servers could only send local events if they were
also extremities and hope that the other server will send the event
over, but that is a bit risky.
  • Loading branch information
erikjohnston committed Mar 18, 2021
1 parent 7b06f85 commit dd71eb0
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 38 deletions.
1 change: 1 addition & 0 deletions changelog.d/9640.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.
25 changes: 2 additions & 23 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from twisted.internet.abstract import isIPAddress
from twisted.python import failure

from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.constants import EduTypes, EventTypes
from synapse.api.errors import (
AuthError,
Codes,
Expand Down Expand Up @@ -63,7 +63,7 @@
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import JsonDict, get_domain_from_id
from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -727,27 +727,6 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.sender):
# We continue to accept join events from any server; this is
# necessary for the federation join dance to work correctly.
# (When we join over federation, the "helper" server is
# responsible for sending out the join event, rather than the
# origin. See bug #1893. This is also true for some third party
# invites).
if not (
pdu.type == "m.room.member"
and pdu.content
and pdu.content.get("membership", None)
in (Membership.JOIN, Membership.INVITE)
):
logger.info(
"Discarding PDU %s from invalid origin %s", pdu.event_id, origin
)
return
else:
logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)

# We've already checked that we know the room version by this point
room_version = await self.store.get_room_version(pdu.room_id)
Expand Down
104 changes: 89 additions & 15 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple

import attr
from prometheus_client import Counter
Expand Down Expand Up @@ -77,6 +77,7 @@ def __init__(
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
self._state = hs.get_state_handler()

self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
Expand Down Expand Up @@ -415,22 +416,95 @@ async def _catch_up_transmission_loop(self) -> None:
"This should not happen." % event_ids
)

if logger.isEnabledFor(logging.INFO):
rooms = [p.room_id for p in catchup_pdus]
logger.info("Catching up rooms to %s: %r", self._destination, rooms)
# We send transactions with events from one room only, as its likely
# that the remote will have to do additional processing, which may
# take some time. It's better to give it small amounts of work
# rather than risk the request timing out and repeatedly being
# retried, and not making any progress.
#
# Note: `catchup_pdus` will have exactly one PDU per room.
for pdu in catchup_pdus:
# The PDU from the DB will be the last PDU in the room from
# *this server* that wasn't sent to the remote. However, other
# servers may have sent lots of events since then, and we want
# to try and tell the remote only about the *latest* events in
# the room. This is so that it doesn't get inundated by events
# from various parts of the DAG, which all need to be processed.
#
# Note: this does mean that in large rooms a server coming back
# online will get sent the same events from all the different
# servers, but the remote will correctly deduplicate them and
# handle it only once.

# Step 1, fetch the current extremities
extrems = await self._store.get_prev_events_for_room(pdu.room_id)

if pdu.event_id in extrems:
# If the event is in the extremities, then great! We can just
# use that without having to do further checks.
room_catchup_pdus = [pdu]
else:
# If not, fetch the extremities and figure out which we can
# send.
extrem_events = await self._store.get_events_as_list(extrems)

new_pdus = []
for p in extrem_events:
# We pulled this from the DB, so it'll be non-null
assert p.internal_metadata.stream_ordering

# Filter out events that happened before the remote went
# offline
if (
p.internal_metadata.stream_ordering
< self._last_successful_stream_ordering
):
continue

await self._transaction_manager.send_new_transaction(
self._destination, catchup_pdus, []
)
# Filter out events where the server is not in the room,
# e.g. it may have left/been kicked. *Ideally* we'd pull
# out the kick and send that, but it's a rare edge case
# so we don't bother for now (the server that sent the
# kick should send it out if its online).
hosts = await self._state.get_hosts_in_room_at_events(
p.room_id, [p.event_id]
)
if self._destination not in hosts:
continue

sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)
new_pdus.append(p)

# If we've filtered out all the extremities, fall back to
# sending the original event. This should ensure that the
# server gets at least some of missed events (especially if
# the other sending servers are up).
if new_pdus:
room_catchup_pdus = new_pdus

logger.info(
"Catching up rooms to %s: %r", self._destination, pdu.room_id
)

await self._transaction_manager.send_new_transaction(
self._destination, room_catchup_pdus, []
)

sent_transactions_counter.inc()

# We pulled this from the DB, so it'll be non-null
assert pdu.internal_metadata.stream_ordering

# Note that we mark the last successful stream ordering as that
# from the *original* PDU, rather than the PDU(s) we actually
# send. This is because we use it to mark our position in the
# queue of missed PDUs to process.
self._last_successful_stream_ordering = (
pdu.internal_metadata.stream_ordering
)

await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
Expand Down
49 changes: 49 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from mock import Mock

from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.federation.sender import PerDestinationQueue, TransactionManager
from synapse.federation.units import Edu
Expand Down Expand Up @@ -421,3 +422,51 @@ def wake_destination_track(destination):
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])

@override_config({"send_federation": True})
def test_not_latest_event(self):
"""Test that we send the latest event in the room even if its not ours."""

per_dest_queue, sent_pdus = self.make_fake_destination_queue()

# Make a room with a local user, and two servers. One will go offline
# and one will send some events.
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_1 = self.helper.create_room_as("u1", tok=u1_token)

self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
event_1 = self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
)

# First we send something from the local server, so that we notice the
# remote is down and go into catchup mode.
self.helper.send(room_1, "you hear me!!", tok=u1_token)

# Now simulate us receiving an event from the still online remote.
event_2 = self.get_success(
event_injection.inject_event(
self.hs,
type=EventTypes.Message,
sender="@user:host3",
room_id=room_1,
content={"msgtype": "m.text", "body": "Hello"},
)
)

self.get_success(
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
"host2", event_1.internal_metadata.stream_ordering
)
)

self.get_success(per_dest_queue._catch_up_transmission_loop())

# We expect only the last message from the remote, event_2, to have been
# sent, rather than the last *local* event that was sent.
self.assertEqual(len(sent_pdus), 1)
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
self.assertFalse(per_dest_queue._catching_up)

0 comments on commit dd71eb0

Please sign in to comment.