diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 6979dd081a5f..43e79402e65a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -832,7 +832,7 @@ async def _resolve_state_at_missing_prevs( ) # Calculate the state after each of the previous events, and # resolve them to find the correct state at the current event. - event_map = {event_id: event} + try: # Get the state of the events we know about ours = await self._state_store.get_state_groups_ids(room_id, seen) @@ -865,7 +865,7 @@ async def _resolve_state_at_missing_prevs( room_id, room_version, state_maps, - event_map, + event_map={event_id: event}, state_res_store=StateResolutionStore(self._store), ) @@ -911,7 +911,7 @@ async def _get_state_ids_after_missing_prev_event( len(auth_event_ids), ) - # start by just trying to fetch the events from the store + # Start by checking events we already have in the DB desired_events = set(state_event_ids) desired_events.add(event_id) logger.debug("Fetching %i events from cache/store", len(desired_events)) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 955e6a83140d..dd6f07dd1da5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -55,7 +55,14 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter -from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester +from synapse.types import ( + MutableStateMap, + Requester, + RoomAlias, + StreamToken, + UserID, + create_requester, +) from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError from synapse.util.async_helpers import Linearizer, gather_results from synapse.util.caches.expiringcache import ExpiringCache @@ -1024,22 +1031,32 @@ async def create_new_client_event( # old state is complete. metadata = await self.store.get_metadata_for_events(state_event_ids) - state_map = {} + state_map_for_event: MutableStateMap[str] = {} for state_id in state_event_ids: data = metadata.get(state_id) if data is None: - raise Exception(f"State event {state_id} not persisted") + # We're trying to persist a new historical batch of events + # with the given state, e.g. via + # `RoomBatchSendEventRestServlet`. The state can be inferred + # by Synapse or set directly by the client. + # + # Either way, we should have persisted all the state before + # getting here. + raise Exception( + f"State event {state_id} not found in DB," + " Synapse should have persisted it before using it." + ) if data.state_key is None: raise Exception( f"Trying to set non-state event {state_id} as state" ) - state_map[(data.event_type, data.state_key)] = state_id + state_map_for_event[(data.event_type, data.state_key)] = state_id context = await self.state.compute_event_context( event, - state_ids_before_event=state_map, + state_ids_before_event=state_map_for_event, ) else: context = await self.state.compute_event_context(event) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index e126020135fa..f2e79f01a77f 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -182,7 +182,7 @@ def get_metadata_for_events_txn( result_map: Dict[str, EventMetadata] = {} for batch_ids in batch_iter(event_ids, 1000): - return await self.db_pool.runInteraction( + result_map = await self.db_pool.runInteraction( "get_metadata_for_events", get_metadata_for_events_txn, batch_ids=batch_ids,