Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Simplify _auth_and_persist_fetched_events #10901

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10901.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up some of the federation event authentication code for clarity.
91 changes: 22 additions & 69 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@
Tuple,
)

import attr
from prometheus_client import Counter

from twisted.internet import defer

from synapse import event_auth
from synapse.api.constants import (
EventContentFields,
Expand All @@ -54,11 +51,7 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import (
make_deferred_yieldable,
nested_logging_context,
run_in_background,
)
from synapse.logging.context import nested_logging_context, run_in_background
from synapse.logging.utils import log_function
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
Expand All @@ -75,7 +68,11 @@
UserID,
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.async_helpers import (
Linearizer,
concurrently_execute,
yieldable_gather_results,
)
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
Expand All @@ -92,30 +89,6 @@
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _NewEventInfo:
"""Holds information about a received event, ready for passing to _auth_and_persist_events

Attributes:
event: the received event

claimed_auth_event_map: a map of (type, state_key) => event for the event's
claimed auth_events.

This can include events which have not yet been persisted, in the case that
we are backfilling a batch of events.

Note: May be incomplete: if we were unable to find all of the claimed auth
events. Also, treat the contents with caution: the events might also have
been rejected, might not yet have been authorized themselves, or they might
be in the wrong room.

"""

event: EventBase
claimed_auth_event_map: StateMap[EventBase]


class FederationEventHandler:
"""Handles events that originated from federation.

Expand Down Expand Up @@ -1203,47 +1176,27 @@ async def _auth_and_persist_fetched_events(
allow_rejected=True,
)

event_infos = []
for event in fetched_events:
auth = {}
for auth_event_id in event.auth_event_ids():
ae = persisted_events.get(auth_event_id)
if ae:
auth[(ae.type, ae.state_key)] = ae
else:
logger.info("Missing auth event %s", auth_event_id)

event_infos.append(_NewEventInfo(event, auth))

if not event_infos:
return

async def prep(ev_info: _NewEventInfo) -> EventContext:
event = ev_info.event
async def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
with nested_logging_context(suffix=event.event_id):
res = EventContext.for_outlier()
res = await self._check_event_auth(
auth = {}
for auth_event_id in event.auth_event_ids():
ae = persisted_events.get(auth_event_id)
if ae:
auth[(ae.type, ae.state_key)] = ae
else:
logger.info("Missing auth event %s", auth_event_id)

context = EventContext.for_outlier()
context = await self._check_event_auth(
origin,
event,
res,
claimed_auth_event_map=ev_info.claimed_auth_event_map,
context,
claimed_auth_event_map=auth,
)
return res

contexts = await make_deferred_yieldable(
defer.gatherResults(
[run_in_background(prep, ev_info) for ev_info in event_infos],
consumeErrors=True,
)
)
return event, context

await self.persist_events_and_notify(
room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
],
)
events_to_persist = await yieldable_gather_results(prep, fetched_events)
await self.persist_events_and_notify(room_id, events_to_persist)

async def _check_event_auth(
self,
Expand Down