Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Limit the number of events sent over replication when persisting events. #10082

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10082.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug causing replication requests to fail when receiving a lot of events via federation.
17 changes: 10 additions & 7 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
Expand Down Expand Up @@ -3053,13 +3054,15 @@ async def persist_events_and_notify(
"""
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events(
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
# Limit the number of events sent over federation.
for batch in batch_iter(event_and_contexts, 50):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
result = await self._send_events(
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
)
return result["max_stream_id"]
else:
assert self.storage.persistence
Expand Down