Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #8420 from matrix-org/rav/state_res_stats
Browse files Browse the repository at this point in the history
Report metrics on expensive rooms for state res
  • Loading branch information
richvdh authored Sep 30, 2020
2 parents ea70f1c + d4274dd commit c429dfc
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 79 deletions.
1 change: 1 addition & 0 deletions changelog.d/8420.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental reporting of metrics on expensive rooms for state-resolution.
13 changes: 8 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
from collections.abc import Container
from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import attr
from signedjson.key import decode_verify_key_bytes
Expand Down Expand Up @@ -69,7 +69,7 @@
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
Expand All @@ -85,6 +85,9 @@
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -116,7 +119,7 @@ class FederationHandler(BaseHandler):
rooms.
"""

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.hs = hs
Expand All @@ -126,6 +129,7 @@ def __init__(self, hs):
self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self._state_resolution_handler = hs.get_state_resolution_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
Expand Down Expand Up @@ -381,8 +385,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
event_map[x.event_id] = x

room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
self.clock,
state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
room_version,
state_maps,
Expand Down
233 changes: 169 additions & 64 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import heapq
import logging
from collections import namedtuple
from collections import defaultdict, namedtuple
from typing import (
Any,
Awaitable,
Callable,
DefaultDict,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)

import attr
from frozendict import frozendict
from prometheus_client import Histogram
from prometheus_client import Counter, Histogram
from typing_extensions import Literal

from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo
from synapse.types import Collection, StateMap
from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)

metrics_logger = logging.getLogger("synapse.state.metrics")

# Metrics for number of state groups involved in a resolution.
state_groups_histogram = Histogram(
Expand Down Expand Up @@ -448,19 +452,44 @@ async def resolve_events(

state_map = {ev.event_id: ev for st in state_sets for ev in st}

with Measure(self.clock, "state._resolve_events"):
new_state = await resolve_events_with_store(
self.clock,
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
state_res_store=StateResolutionStore(self.store),
)
new_state = await self._state_resolution_handler.resolve_events_with_store(
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
state_res_store=StateResolutionStore(self.store),
)

return {key: state_map[ev_id] for key, ev_id in new_state.items()}


@attr.s(slots=True)
class _StateResMetrics:
"""Keeps track of some usage metrics about state res."""

# System and User CPU time, in seconds
cpu_time = attr.ib(type=float, default=0.0)

# time spent on database transactions (excluding scheduling time). This roughly
# corresponds to the amount of work done on the db server, excluding event fetches.
db_time = attr.ib(type=float, default=0.0)

# number of events fetched from the db.
db_events = attr.ib(type=int, default=0)


_biggest_room_by_cpu_counter = Counter(
"synapse_state_res_cpu_for_biggest_room_seconds",
"CPU time spent performing state resolution for the single most expensive "
"room for state resolution",
)
_biggest_room_by_db_counter = Counter(
"synapse_state_res_db_for_biggest_room_seconds",
"Database time spent performing state resolution for the single most "
"expensive room for state resolution",
)


class StateResolutionHandler:
"""Responsible for doing state conflict resolution.
Expand All @@ -483,6 +512,17 @@ def __init__(self, hs):
reset_expiry_on_get=True,
)

#
# stuff for tracking time spent on state-res by room
#

# tracks the amount of work done on state res per room
self._state_res_metrics = defaultdict(
_StateResMetrics
) # type: DefaultDict[str, _StateResMetrics]

self.clock.looping_call(self._report_metrics, 120 * 1000)

@log_function
async def resolve_state_groups(
self,
Expand Down Expand Up @@ -530,15 +570,13 @@ async def resolve_state_groups(

state_groups_histogram.observe(len(state_groups_ids))

with Measure(self.clock, "state._resolve_events"):
new_state = await resolve_events_with_store(
self.clock,
room_id,
room_version,
list(state_groups_ids.values()),
event_map=event_map,
state_res_store=state_res_store,
)
new_state = await self.resolve_events_with_store(
room_id,
room_version,
list(state_groups_ids.values()),
event_map=event_map,
state_res_store=state_res_store,
)

# if the new state matches any of the input state groups, we can
# use that state group again. Otherwise we will generate a state_id
Expand All @@ -552,6 +590,114 @@ async def resolve_state_groups(

return cache

async def resolve_events_with_store(
self,
room_id: str,
room_version: str,
state_sets: Sequence[StateMap[str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
) -> StateMap[str]:
"""
Args:
room_id: the room we are working in
room_version: Version of the room
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_res_store.
state_res_store: a place to fetch events from
Returns:
a map from (type, state_key) to event_id.
"""
try:
with Measure(self.clock, "state._resolve_events") as m:
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return await v1.resolve_events_with_store(
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return await v2.resolve_events_with_store(
self.clock,
room_id,
room_version,
state_sets,
event_map,
state_res_store,
)
finally:
self._record_state_res_metrics(room_id, m.get_resource_usage())

def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
room_metrics = self._state_res_metrics[room_id]
room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
room_metrics.db_time += rusage.db_txn_duration_sec
room_metrics.db_events += rusage.evt_db_fetch_count

def _report_metrics(self):
if not self._state_res_metrics:
# no state res has happened since the last iteration: don't bother logging.
return

self._report_biggest(
lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
)

self._report_biggest(
lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
)

self._state_res_metrics.clear()

def _report_biggest(
self,
extract_key: Callable[[_StateResMetrics], Any],
metric_name: str,
prometheus_counter_metric: Counter,
) -> None:
"""Report metrics on the biggest rooms for state res
Args:
extract_key: a callable which, given a _StateResMetrics, extracts a single
metric to sort by.
metric_name: the name of the metric we have extracted, for the log line
prometheus_counter_metric: a prometheus metric recording the sum of the
the extracted metric
"""
n_to_log = 10
if not metrics_logger.isEnabledFor(logging.DEBUG):
# only need the most expensive if we don't have debug logging, which
# allows nlargest() to degrade to max()
n_to_log = 1

items = self._state_res_metrics.items()

# log the N biggest rooms
biggest = heapq.nlargest(
n_to_log, items, key=lambda i: extract_key(i[1])
) # type: List[Tuple[str, _StateResMetrics]]
metrics_logger.debug(
"%i biggest rooms for state-res by %s: %s",
len(biggest),
metric_name,
["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
)

# report info on the single biggest to prometheus
_, biggest_metrics = biggest[0]
prometheus_counter_metric.inc(extract_key(biggest_metrics))


def _make_state_cache_entry(
new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
Expand Down Expand Up @@ -605,47 +751,6 @@ def _make_state_cache_entry(
)


def resolve_events_with_store(
clock: Clock,
room_id: str,
room_version: str,
state_sets: Sequence[StateMap[str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
) -> Awaitable[StateMap[str]]:
"""
Args:
room_id: the room we are working in
room_version: Version of the room
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_res_store.
state_res_store: a place to fetch events from
Returns:
a map from (type, state_key) to event_id.
"""
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return v2.resolve_events_with_store(
clock, room_id, room_version, state_sets, event_map, state_res_store
)


@attr.s(slots=True)
class StateResolutionStore:
"""Interface that allows state resolution algorithms to access the database
Expand Down
Loading

0 comments on commit c429dfc

Please sign in to comment.