Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18484.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove destinations from sending if not whitelisted.
60 changes: 52 additions & 8 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ async def _handle(self) -> None:
destination, _ = self.queue.popitem(last=False)

queue = self.sender._get_per_destination_queue(destination)
if queue is None:
continue

if not queue._new_data_to_send:
# The per destination queue has already been woken up.
Expand Down Expand Up @@ -436,12 +438,24 @@ def __init__(self, hs: "HomeServer"):
self._wake_destinations_needing_catchup,
)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
def _get_per_destination_queue(
self, destination: str
) -> Optional[PerDestinationQueue]:
"""Get or create a PerDestinationQueue for the given destination

Args:
destination: server_name of remote server

Returns:
None if federation_domain_whitelist exists and destination not in
whitelist. Otherwise PerDestinationQueue for this destination.
"""
if (
self.hs.config.federation.federation_domain_whitelist is not None
and destination not in self.hs.config.federation.federation_domain_whitelist
):
return None

queue = self._per_destination_queues.get(destination)
if not queue:
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
Expand Down Expand Up @@ -718,6 +732,15 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
# Filter out any destinations not present in the federation_domain_whitelist, if
# the whitelist exists. These destinations should not be sent to so let's not
# waste time or space keeping track of events destined for them.
destinations = [
d
for d in destinations
if self.hs.config.federation.federation_domain_whitelist is None
or d in self.hs.config.federation.federation_domain_whitelist
]
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
Expand All @@ -732,7 +755,9 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
)

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
queue = self._get_per_destination_queue(destination)
if queue is not None:
queue.send_pdu(pdu)

async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Expand Down Expand Up @@ -841,12 +866,16 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
for domain in immediate_domains:
# Add to destination queue and wake the destination up
queue = self._get_per_destination_queue(domain)
if queue is None:
continue
queue.queue_read_receipt(receipt)
queue.attempt_new_transaction()

for domain in delay_domains:
# Add to destination queue...
queue = self._get_per_destination_queue(domain)
if queue is None:
continue
queue.queue_read_receipt(receipt)

# ... and schedule the destination to be woken up.
Expand Down Expand Up @@ -882,9 +911,10 @@ async def send_presence_to_destinations(
if self.is_mine_server_name(destination):
continue

self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
)
queue = self._get_per_destination_queue(destination)
if queue is None:
continue
queue.send_presence(states, start_loop=False)

self._destination_wakeup_queue.add_to_queue(destination)

Expand Down Expand Up @@ -934,6 +964,8 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
return

queue = self._get_per_destination_queue(edu.destination)
if queue is None:
return
if key:
queue.send_keyed_edu(edu, key)
else:
Expand All @@ -958,9 +990,15 @@ async def send_device_messages(

for destination in destinations:
if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
queue = self._get_per_destination_queue(destination)
if queue is None:
continue
queue.attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
queue = self._get_per_destination_queue(destination)
if queue is None:
continue
queue.mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)

def wake_destination(self, destination: str) -> None:
Expand All @@ -979,7 +1017,9 @@ def wake_destination(self, destination: str) -> None:
):
return

self._get_per_destination_queue(destination).attempt_new_transaction()
queue = self._get_per_destination_queue(destination)
if queue is not None:
queue.attempt_new_transaction()

@staticmethod
def get_current_token() -> int:
Expand Down Expand Up @@ -1024,6 +1064,10 @@ async def _wake_destinations_needing_catchup(self) -> None:
d
for d in destinations_to_wake
if self._federation_shard_config.should_handle(self._instance_name, d)
and (
self.hs.config.federation.federation_domain_whitelist is None
or d in self.hs.config.federation.federation_domain_whitelist
)
]

for destination in destinations_to_wake:
Expand Down
Loading