diff --git a/changelog.d/10133.bugfix b/changelog.d/10133.bugfix new file mode 100644 index 000000000000..a62c15b260a1 --- /dev/null +++ b/changelog.d/10133.bugfix @@ -0,0 +1 @@ +Fix bug when using workers where pagination requests failed if a remote server returned zero events from `/backfill`. Introduced in 1.35.0. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b802822baaa2..abbb71424d7d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from http import HTTPStatus from typing import ( TYPE_CHECKING, + Collection, Dict, Iterable, List, @@ -1364,11 +1365,12 @@ async def get_event(event_id: str): event_infos.append(_NewEventInfo(event, None, auth)) - await self._auth_and_persist_events( - destination, - room_id, - event_infos, - ) + if event_infos: + await self._auth_and_persist_events( + destination, + room_id, + event_infos, + ) def _sanity_check_event(self, ev: EventBase) -> None: """ @@ -2077,7 +2079,7 @@ async def _auth_and_persist_events( self, origin: str, room_id: str, - event_infos: Iterable[_NewEventInfo], + event_infos: Collection[_NewEventInfo], backfilled: bool = False, ) -> None: """Creates the appropriate contexts and persists events. The events @@ -2088,6 +2090,9 @@ async def _auth_and_persist_events( Notifies about the events where appropriate. """ + if not event_infos: + return + async def prep(ev_info: _NewEventInfo): event = ev_info.event with nested_logging_context(suffix=event.event_id): @@ -2216,13 +2221,14 @@ async def _persist_auth_tree( raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - await self.persist_events_and_notify( - room_id, - [ - (e, events_to_context[e.event_id]) - for e in itertools.chain(auth_events, state) - ], - ) + if auth_events or state: + await self.persist_events_and_notify( + room_id, + [ + (e, events_to_context[e.event_id]) + for e in itertools.chain(auth_events, state) + ], + ) new_event_context = await self.state_handler.compute_event_context( event, old_state=state @@ -3061,7 +3067,13 @@ async def persist_events_and_notify( the same room. backfilled: Whether these events are a result of backfilling or not + + Returns: + The stream ID after which all events have been persisted. """ + if not event_and_contexts: + return self.store.get_current_events_token() + instance = self.config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200