Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Instrument state and state_group storage related things (tracing) #15610

Merged
merged 6 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15610.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Instrument `state` and `state_group` storage-related operations to better picture what's happening when tracing.
5 changes: 5 additions & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.types import JsonDict, StateMap

if TYPE_CHECKING:
Expand Down Expand Up @@ -242,6 +243,8 @@ def state_group(self) -> Optional[int]:

return self._state_group

@trace
@tag_args
async def get_current_state_ids(
self, state_filter: Optional["StateFilter"] = None
) -> Optional[StateMap[str]]:
Expand Down Expand Up @@ -275,6 +278,8 @@ async def get_current_state_ids(

return prev_state_ids

@trace
@tag_args
async def get_prev_state_ids(
self, state_filter: Optional["StateFilter"] = None
) -> StateMap[str]:
Expand Down
4 changes: 4 additions & 0 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
UnpersistedEventContextBase,
)
from synapse.logging.context import ContextResourceUsage
from synapse.logging.opentracing import tag_args, trace
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
Expand Down Expand Up @@ -270,6 +271,8 @@ async def get_hosts_in_room_at_events(
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
return await self.store.get_joined_hosts(room_id, state, entry)

@trace
@tag_args
async def calculate_context_info(
self,
event: EventBase,
Expand Down Expand Up @@ -465,6 +468,7 @@ async def compute_event_context(

return await unpersisted_context.persist(event)

@trace
@measure_func()
async def resolve_state_groups_for_events(
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
Expand Down
33 changes: 33 additions & 0 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def notify_room_un_partial_stated(self, room_id: str) -> None:
"""
self._partial_state_room_tracker.notify_un_partial_stated(room_id)

@trace
@tag_args
async def get_state_group_delta(
self, state_group: int
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
Expand All @@ -84,6 +86,8 @@ async def get_state_group_delta(
state_group_delta = await self.stores.state.get_state_group_delta(state_group)
return state_group_delta.prev_group, state_group_delta.delta_ids

@trace
@tag_args
async def get_state_groups_ids(
self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
) -> Dict[int, MutableStateMap[str]]:
Expand Down Expand Up @@ -114,6 +118,8 @@ async def get_state_groups_ids(

return group_to_state

@trace
@tag_args
async def get_state_ids_for_group(
self, state_group: int, state_filter: Optional[StateFilter] = None
) -> StateMap[str]:
Expand All @@ -130,6 +136,8 @@ async def get_state_ids_for_group(

return group_to_state[state_group]

@trace
@tag_args
async def get_state_groups(
self, room_id: str, event_ids: Collection[str]
) -> Dict[int, List[EventBase]]:
Expand Down Expand Up @@ -165,6 +173,8 @@ async def get_state_groups(
for group, event_id_map in group_to_ids.items()
}

@trace
@tag_args
def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
) -> Awaitable[Dict[int, StateMap[str]]]:
Expand All @@ -183,6 +193,7 @@ def _get_state_groups_from_groups(
return self.stores.state._get_state_groups_from_groups(groups, state_filter)

@trace
@tag_args
async def get_state_for_events(
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[EventBase]]:
Expand Down Expand Up @@ -280,6 +291,8 @@ async def get_state_ids_for_events(

return {event: event_to_state[event] for event in event_ids}

@trace
@tag_args
async def get_state_for_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
Expand All @@ -303,6 +316,7 @@ async def get_state_for_event(
return state_map[event_id]

@trace
@tag_args
async def get_state_ids_for_event(
self,
event_id: str,
Expand Down Expand Up @@ -333,6 +347,8 @@ async def get_state_ids_for_event(
)
return state_map[event_id]

@trace
@tag_args
def get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Awaitable[Dict[int, MutableStateMap[str]]]:
Expand Down Expand Up @@ -402,6 +418,8 @@ async def store_state_group(
event_id, room_id, prev_group, delta_ids, current_state_ids
)

@trace
@tag_args
@cancellable
async def get_current_state_ids(
self,
Expand Down Expand Up @@ -442,6 +460,8 @@ async def get_current_state_ids(
room_id, on_invalidate=on_invalidate
)

@trace
@tag_args
async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:
"""Get canonical alias for room, if any

Expand All @@ -466,6 +486,8 @@ async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:

return event.content.get("canonical_alias")

@trace
@tag_args
async def get_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
) -> Tuple[int, List[Dict[str, Any]]]:
Expand Down Expand Up @@ -500,6 +522,7 @@ async def get_current_state_deltas(
)

@trace
@tag_args
async def get_current_state(
self, room_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
Expand All @@ -516,6 +539,8 @@ async def get_current_state(

return state_map

@trace
@tag_args
async def get_current_state_event(
self, room_id: str, event_type: str, state_key: str
) -> Optional[EventBase]:
Expand All @@ -527,6 +552,8 @@ async def get_current_state_event(
)
return state_map.get(key)

@trace
@tag_args
async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:
"""Get current hosts in room based on current state.

Expand All @@ -538,6 +565,8 @@ async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:

return await self.stores.main.get_current_hosts_in_room(room_id)

@trace
@tag_args
async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:
"""Get current hosts in room based on current state.

Expand All @@ -553,6 +582,8 @@ async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:

return await self.stores.main.get_current_hosts_in_room_ordered(room_id)

@trace
@tag_args
async def get_current_hosts_in_room_or_partial_state_approximation(
self, room_id: str
) -> Collection[str]:
Expand Down Expand Up @@ -582,6 +613,8 @@ async def get_current_hosts_in_room_or_partial_state_approximation(

return hosts

@trace
@tag_args
async def get_users_in_room_with_profiles(
self, room_id: str
) -> Mapping[str, ProfileInfo]:
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/databases/state/bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union

from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand All @@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
updates.
"""

@trace
@tag_args
def _count_state_group_hops_txn(
self, txn: LoggingTransaction, state_group: int
) -> int:
Expand Down Expand Up @@ -83,6 +86,8 @@ def _count_state_group_hops_txn(

return count

@trace
@tag_args
def _get_state_groups_from_groups_txn(
self,
txn: LoggingTransaction,
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -159,6 +160,8 @@ def _get_state_group_delta_txn(txn: LoggingTransaction) -> _GetStateGroupDelta:
"get_state_group_delta", _get_state_group_delta_txn
)

@trace
@tag_args
@cancellable
async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
Expand Down Expand Up @@ -187,6 +190,8 @@ async def _get_state_groups_from_groups(

return results

@trace
@tag_args
def _get_state_for_group_using_cache(
self,
cache: DictionaryCache[int, StateKey, str],
Expand Down Expand Up @@ -239,6 +244,8 @@ def _get_state_for_group_using_cache(

return state_filter.filter_state(state_dict_ids), not missing_types

@trace
@tag_args
@cancellable
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
Expand Down Expand Up @@ -305,6 +312,8 @@ async def _get_state_for_groups(

return state

@trace
@tag_args
def _get_state_for_groups_using_cache(
self,
groups: Iterable[int],
Expand Down Expand Up @@ -403,6 +412,8 @@ def _insert_into_cache(
fetched_keys=non_member_types,
)

@trace
@tag_args
async def store_state_deltas_for_batched(
self,
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
Expand Down Expand Up @@ -520,6 +531,8 @@ def insert_deltas_group_txn(
prev_group,
)

@trace
@tag_args
async def store_state_group(
self,
event_id: str,
Expand Down Expand Up @@ -772,6 +785,8 @@ def _purge_unreferenced_state_groups(
((sg,) for sg in state_groups_to_delete),
)

@trace
@tag_args
async def get_previous_state_groups(
self, state_groups: Iterable[int]
) -> Dict[int, int]:
Expand Down