Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prune inbound federation queues if they get too long #10390

Merged
merged 6 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10390.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prune inbound federation inbound queues for a room if they get too large.
17 changes: 17 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,23 @@ async def _process_incoming_pdus_in_room_inner(

origin, event = next

# Prune the event queue if its getting large.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
#
# We do this *after* handling the first event as the common case is
# that the queue is empty (/has the single event in), and so there's
# no need to do this check.
pruned = await self.store.prune_staged_events_in_room(room_id, room_version)
if pruned:
# If we have pruned the queue check we need to refetch the next
# event to handle.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
break

origin, event = next

lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
Expand Down
82 changes: 80 additions & 2 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple

from prometheus_client import Gauge
from prometheus_client import Counter, Gauge

from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand All @@ -44,6 +44,12 @@
"The total number of events in the inbound federation staging",
)

pdus_pruned_from_federation_queue = Counter(
"synapse_federation_server_number_inbound_pdu_pruned",
"The number of events in the inbound federation staging that have been "
"pruned due to the queue getting too long",
)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -1207,6 +1213,78 @@ def _get_next_staged_event_for_room_txn(txn):

return origin, event

async def prune_staged_events_in_room(
self,
room_id: str,
room_version: RoomVersion,
) -> bool:
"""Checks if there are lots of staged events for the room, and if so
prune them down.

Returns:
Whether any events were pruned
"""

# First check the size of the queue.
count = await self.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="prune_staged_events_in_room_count",
)

if count < 100:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return False

# If the queue is too large, then we want clear the entire queue,
# keeping only the forward extremities (i.e. the events not referenced
# by other events in the queue). We do this as that we can always
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# backpaginate in all the events we have dropped.
rows = await self.db_pool.simple_select_list(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcols=("event_id", "event_json"),
desc="prune_staged_events_in_room_fetch",
)

# Find the set of events referenced by those in the queue, as well as
# collecting all the event IDs in the queue.
referenced_events: Set[str] = set()
seen_events: Set[str] = set()
for row in rows:
seen_events.add(row["event_id"])
event_d = db_to_json(row["event_json"])

# We don't bother parsing the dicts into full blown event objects,
# as that is needlessly expensive.
if room_version.event_format == EventFormatVersions.V1:
referenced_events.update(
event_id for event_id, _ in event_d.get("prev_events", [])
)
else:
referenced_events.update(event_d.get("prev_events", []))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we validate that prev_events has the right shape when we receive the event, so you'll need to add some validation here otherwise we'll end up exploding with malformed events.


to_delete = referenced_events & seen_events
if not to_delete:
return False

pdus_pruned_from_federation_queue.inc(len(to_delete))
logger.info(
"Pruning %d events in room %s from federation queue",
len(to_delete),
room_id,
)

await self.db_pool.simple_delete_many(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
iterable=to_delete,
column="event_id",
desc="prune_staged_events_in_room_delete",
)

return True

async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
Expand Down
57 changes: 57 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import attr
from parameterized import parameterized

from synapse.api.room_versions import RoomVersions
from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder

import tests.unittest
import tests.utils
Expand Down Expand Up @@ -504,6 +506,61 @@ def insert_event(txn):
)
self.assertSetEqual(difference, set())

def test_prune_inbound_federation_queue(self):
"Test that pruning of inbound federation queues work"

room_id = "some_room_id"

# Insert a bunch of events that all reference the previous one.
self.get_success(
self.store.db_pool.simple_insert_many(
table="federation_inbound_events_staging",
values=[
{
"origin": "some_origin",
"room_id": room_id,
"received_ts": 0,
"event_id": f"$fake_event_id_{i + 1}",
"event_json": json_encoder.encode(
{"prev_events": [f"$fake_event_id_{i}"]}
),
"internal_metadata": "{}",
}
for i in range(500)
],
desc="test_prune_inbound_federation_queue",
)
)

# Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't.
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertTrue(pruned)

pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertFalse(pruned)

# Assert that we only have a single event left in the queue, and that it
# is the last one.
count = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="test_prune_inbound_federation_queue",
)
)
self.assertEqual(count, 1)

_, event_id = self.get_success(
self.store.get_next_staged_event_id_for_room(room_id)
)
self.assertEqual(event_id, "$fake_event_id_500")


@attr.s
class FakeEvent:
Expand Down