Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make federation catchup send last event from any server. #9640

Merged
merged 3 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9640.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.
25 changes: 2 additions & 23 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from twisted.internet.abstract import isIPAddress
from twisted.python import failure

from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.constants import EduTypes, EventTypes
from synapse.api.errors import (
AuthError,
Codes,
Expand Down Expand Up @@ -63,7 +63,7 @@
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import JsonDict, get_domain_from_id
from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -727,27 +727,6 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.sender):
clokep marked this conversation as resolved.
Show resolved Hide resolved
# We continue to accept join events from any server; this is
# necessary for the federation join dance to work correctly.
# (When we join over federation, the "helper" server is
# responsible for sending out the join event, rather than the
# origin. See bug #1893. This is also true for some third party
# invites).
if not (
pdu.type == "m.room.member"
and pdu.content
and pdu.content.get("membership", None)
in (Membership.JOIN, Membership.INVITE)
):
logger.info(
"Discarding PDU %s from invalid origin %s", pdu.event_id, origin
)
return
else:
logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)

# We've already checked that we know the room version by this point
room_version = await self.store.get_room_version(pdu.room_id)
Expand Down
104 changes: 89 additions & 15 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple

import attr
from prometheus_client import Counter
Expand Down Expand Up @@ -77,6 +77,7 @@ def __init__(
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
self._state = hs.get_state_handler()

self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
Expand Down Expand Up @@ -415,22 +416,95 @@ async def _catch_up_transmission_loop(self) -> None:
"This should not happen." % event_ids
)

if logger.isEnabledFor(logging.INFO):
rooms = [p.room_id for p in catchup_pdus]
logger.info("Catching up rooms to %s: %r", self._destination, rooms)
# We send transactions with events from one room only, as its likely
# that the remote will have to do additional processing, which may
# take some time. It's better to give it small amounts of work
# rather than risk the request timing out and repeatedly being
# retried, and not making any progress.
#
# Note: `catchup_pdus` will have exactly one PDU per room.
for pdu in catchup_pdus:
clokep marked this conversation as resolved.
Show resolved Hide resolved
# The PDU from the DB will be the last PDU in the room from
# *this server* that wasn't sent to the remote. However, other
clokep marked this conversation as resolved.
Show resolved Hide resolved
# servers may have sent lots of events since then, and we want
# to try and tell the remote only about the *latest* events in
# the room. This is so that it doesn't get inundated by events
# from various parts of the DAG, which all need to be processed.
#
# Note: this does mean that in large rooms a server coming back
# online will get sent the same events from all the different
# servers, but the remote will correctly deduplicate them and
# handle it only once.

# Step 1, fetch the current extremities
extrems = await self._store.get_prev_events_for_room(pdu.room_id)

if pdu.event_id in extrems:
# If the event is in the extremities, then great! We can just
# use that without having to do further checks.
room_catchup_pdus = [pdu]
else:
# If not, fetch the extremities and figure out which we can
# send.
extrem_events = await self._store.get_events_as_list(extrems)

new_pdus = []
for p in extrem_events:
# We pulled this from the DB, so it'll be non-null
assert p.internal_metadata.stream_ordering

# Filter out events that happened before the remote went
# offline
if (
p.internal_metadata.stream_ordering
< self._last_successful_stream_ordering
):
continue

await self._transaction_manager.send_new_transaction(
self._destination, catchup_pdus, []
)
# Filter out events where the server is not in the room,
# e.g. it may have left/been kicked. *Ideally* we'd pull
# out the kick and send that, but it's a rare edge case
# so we don't bother for now (the server that sent the
# kick should send it out if its online).
hosts = await self._state.get_hosts_in_room_at_events(
p.room_id, [p.event_id]
)
if self._destination not in hosts:
continue

sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)
new_pdus.append(p)

# If we've filtered out all the extremities, fall back to
# sending the original event. This should ensure that the
# server gets at least some of missed events (especially if
# the other sending servers are up).
if new_pdus:
room_catchup_pdus = new_pdus

logger.info(
"Catching up rooms to %s: %r", self._destination, pdu.room_id
)

await self._transaction_manager.send_new_transaction(
self._destination, room_catchup_pdus, []
)

sent_transactions_counter.inc()

# We pulled this from the DB, so it'll be non-null
assert pdu.internal_metadata.stream_ordering

# Note that we mark the last successful stream ordering as that
# from the *original* PDU, rather than the PDU(s) we actually
# send. This is because we use it to mark our position in the
# queue of missed PDUs to process.
self._last_successful_stream_ordering = (
pdu.internal_metadata.stream_ordering
)

await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
Expand Down
49 changes: 49 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from mock import Mock

from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.federation.sender import PerDestinationQueue, TransactionManager
from synapse.federation.units import Edu
Expand Down Expand Up @@ -421,3 +422,51 @@ def wake_destination_track(destination):
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])

@override_config({"send_federation": True})
def test_not_latest_event(self):
"""Test that we send the latest event in the room even if its not ours."""

per_dest_queue, sent_pdus = self.make_fake_destination_queue()

# Make a room with a local user, and two servers. One will go offline
# and one will send some events.
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_1 = self.helper.create_room_as("u1", tok=u1_token)

self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
event_1 = self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
)

# First we send something from the local server, so that we notice the
# remote is down and go into catchup mode.
self.helper.send(room_1, "you hear me!!", tok=u1_token)

# Now simulate us receiving an event from the still online remote.
event_2 = self.get_success(
event_injection.inject_event(
self.hs,
type=EventTypes.Message,
sender="@user:host3",
room_id=room_1,
content={"msgtype": "m.text", "body": "Hello"},
)
)

self.get_success(
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
"host2", event_1.internal_metadata.stream_ordering
)
)

self.get_success(per_dest_queue._catch_up_transmission_loop())

# We expect only the last message from the remote, event_2, to have been
# sent, rather than the last *local* event that was sent.
self.assertEqual(len(sent_pdus), 1)
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
self.assertFalse(per_dest_queue._catching_up)