diff --git a/changelog.d/18751.misc b/changelog.d/18751.misc new file mode 100644 index 00000000000..6ecd4982867 --- /dev/null +++ b/changelog.d/18751.misc @@ -0,0 +1 @@ +Fix `LaterGauge` metrics to collect from all servers. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7f511d570c0..1e9722e0d41 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,6 +37,7 @@ """ import logging +from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -67,6 +68,25 @@ logger = logging.getLogger(__name__) +class QueueNames(str, Enum): + PRESENCE_MAP = "presence_map" + KEYED_EDU = "keyed_edu" + KEYED_EDU_CHANGED = "keyed_edu_changed" + EDUS = "edus" + POS_TIME = "pos_time" + PRESENCE_DESTINATIONS = "presence_destinations" + + +queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {} + +for queue_name in QueueNames: + queue_name_to_gauge_map[queue_name] = LaterGauge( + name=f"synapse_federation_send_queue_{queue_name.value}_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" @@ -111,23 +131,15 @@ def __init__(self, hs: "HomeServer"): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(name: str, queue: Sized) -> None: - LaterGauge( - name="synapse_federation_send_queue_%s_size" % (queue_name,), - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(queue)}, + def register(queue_name: QueueNames, queue: Sized) -> None: + queue_name_to_gauge_map[queue_name].register_hook( + lambda: {(self.server_name,): len(queue)} ) - for queue_name in [ - "presence_map", - "keyed_edu", - "keyed_edu_changed", - "edus", - "pos_time", - "presence_destinations", - ]: - register(queue_name, getattr(self, queue_name)) + for queue_name in QueueNames: + queue = getattr(self, queue_name.value) + assert isinstance(queue, Sized) + register(queue_name, queue=queue) self.clock.looping_call(self._clear_queue, 30 * 1000) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8befbe37222..21af1235432 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -199,6 +199,24 @@ labelnames=[SERVER_NAME_LABEL], ) +transaction_queue_pending_destinations_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_pdus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_edus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # Time (in s) to wait before trying to wake up destinations that have # catch-up outstanding. # Please note that rate limiting still applies, so while the loop is @@ -398,38 +416,28 @@ def __init__(self, hs: "HomeServer"): # map from destination to PerDestinationQueue self._per_destination_queues: Dict[str, PerDestinationQueue] = {} - LaterGauge( - name="synapse_federation_transaction_queue_pending_destinations", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_destinations_gauge.register_hook( + lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() if d.transmission_loop_running ) - }, + } ) - - LaterGauge( - name="synapse_federation_transaction_queue_pending_pdus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_pdus_gauge.register_hook( + lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) - }, + } ) - LaterGauge( - name="synapse_federation_transaction_queue_pending_edus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_edus_gauge.register_hook( + lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) - }, + } ) self._is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b253117498a..fb9f962672f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -173,6 +173,18 @@ labelnames=["locality", "from", "to", SERVER_NAME_LABEL], ) +presence_user_to_current_state_size_gauge = LaterGauge( + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +presence_wheel_timer_size_gauge = LaterGauge( + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -779,11 +791,8 @@ def __init__(self, hs: "HomeServer"): EduTypes.PRESENCE, self.incoming_presence ) - LaterGauge( - name="synapse_handlers_presence_user_to_current_state_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, + presence_user_to_current_state_size_gauge.register_hook( + lambda: {(self.server_name,): len(self.user_to_current_state)} ) # The per-device presence state, maps user to devices to per-device presence state. @@ -882,11 +891,8 @@ def __init__(self, hs: "HomeServer"): 60 * 1000, ) - LaterGauge( - name="synapse_handlers_presence_wheel_timer_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.wheel_timer)}, + presence_wheel_timer_size_gauge.register_hook( + lambda: {(self.server_name,): len(self.wheel_timer)} ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index a9b049f9043..c5274c758b9 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: return counts -LaterGauge( +in_flight_requests = LaterGauge( name="synapse_http_server_in_flight_requests_count", desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], - caller=_get_in_flight_counts, ) +in_flight_requests.register_hook(_get_in_flight_counts) class RequestMetrics: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 11e2551a168..8c99d3c7700 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -31,6 +31,7 @@ Dict, Generic, Iterable, + List, Mapping, Optional, Sequence, @@ -73,8 +74,6 @@ METRICS_PREFIX = "/_synapse/metrics" -all_gauges: Dict[str, Collector] = {} - HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") SERVER_NAME_LABEL = "server_name" @@ -163,42 +162,47 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # callback: should either return a value (if there are no labels for this metric), - # or dict mapping from a label tuple to a value - caller: Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] + # List of callbacks: each callback should either return a value (if there are no + # labels for this metric), or dict mapping from a label tuple to a value + _hooks: List[ + Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ] + ] = attr.ib(factory=list, hash=False) def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - try: - calls = self.caller() - except Exception: - logger.exception("Exception running callback for LaterGauge(%s)", self.name) - yield g - return + for hook in self._hooks: + try: + hook_result = hook() + except Exception: + logger.exception( + "Exception running callback for LaterGauge(%s)", self.name + ) + yield g + return + + if isinstance(hook_result, (int, float)): + g.add_metric([], hook_result) + else: + for k, v in hook_result.items(): + g.add_metric(k, v) - if isinstance(calls, (int, float)): - g.add_metric([], calls) - else: - for k, v in calls.items(): - g.add_metric(k, v) + yield g - yield g + def register_hook( + self, + hook: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ], + ) -> None: + self._hooks.append(hook) def __attrs_post_init__(self) -> None: - self._register() - - def _register(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - REGISTRY.register(self) - all_gauges[self.name] = self # `MetricsEntry` only makes sense when it is a `Protocol`, @@ -250,7 +254,7 @@ def __init__( # Protects access to _registrations self._lock = threading.Lock() - self._register_with_collector() + REGISTRY.register(self) def register( self, @@ -341,14 +345,6 @@ def collect(self) -> Iterable[Metric]: gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge - def _register_with_collector(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - - REGISTRY.register(self) - all_gauges[self.name] = self - class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): """ diff --git a/synapse/notifier.py b/synapse/notifier.py index 448a715e2a8..d56a7b26bbb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -86,6 +86,24 @@ labelnames=["stream", SERVER_NAME_LABEL], ) + +notifier_listeners_gauge = LaterGauge( + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +notifier_rooms_gauge = LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], +) +notifier_users_gauge = LaterGauge( + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + T = TypeVar("T") @@ -281,28 +299,16 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]: ) } - LaterGauge( - name="synapse_notifier_listeners", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=count_listeners, - ) - - LaterGauge( - name="synapse_notifier_rooms", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + notifier_listeners_gauge.register_hook(count_listeners) + notifier_rooms_gauge.register_hook( + lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) - }, + } ) - LaterGauge( - name="synapse_notifier_users", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, + notifier_users_gauge.register_hook( + lambda: {(self.server_name,): len(self.user_to_user_stream)} ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0f14c7e3804..f033eaaeb57 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -106,6 +106,18 @@ "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] ) +tcp_resource_total_connections_gauge = LaterGauge( + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +tcp_command_queue_gauge = LaterGauge( + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], +) + # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ @@ -243,11 +255,8 @@ def __init__(self, hs: "HomeServer"): # outgoing replication commands to.) self._connections: List[IReplicationConnection] = [] - LaterGauge( - name="synapse_replication_tcp_resource_total_connections", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._connections)}, + tcp_resource_total_connections_gauge.register_hook( + lambda: {(self.server_name,): len(self._connections)} ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -266,14 +275,11 @@ def __init__(self, hs: "HomeServer"): # from that connection. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} - LaterGauge( - name="synapse_replication_tcp_command_queue", - desc="Number of inbound RDATA/POSITION commands queued for processing", - labelnames=["stream_name", SERVER_NAME_LABEL], - caller=lambda: { + tcp_command_queue_gauge.register_hook( + lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() - }, + } ) self._is_master = hs.config.worker.worker_app is None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 969f0303e05..4d8381646aa 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -527,9 +527,11 @@ def replicate(self) -> None: name="synapse_replication_tcp_protocol_pending_commands", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +pending_commands.register_hook( + lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections - }, + } ) @@ -544,9 +546,11 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: name="synapse_replication_tcp_protocol_transport_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +transport_send_buffer.register_hook( + lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections - }, + } ) @@ -571,10 +575,12 @@ def transport_kernel_read_buffer_size( name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_send_buffer.register_hook( + lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections - }, + } ) @@ -582,8 +588,10 @@ def transport_kernel_read_buffer_size( name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_read_buffer.register_hook( + lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections - }, + } ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c969..bbdc5b9d278 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -100,6 +100,12 @@ labelnames=["desc", SERVER_NAME_LABEL], ) +background_update_status = LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=[SERVER_NAME_LABEL], +) + # Unique indexes which have been added in background updates. Maps from table name # to the name of the background update which added the unique index to that table. @@ -611,11 +617,8 @@ def __init__( ) self.updates = BackgroundUpdater(hs, self) - LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self.updates.get_status()}, + background_update_status.register_hook( + lambda: {(self.server_name,): self.updates.get_status()}, ) self._previous_txn_total_time = 0.0 diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 654250fadc5..94a1274edb0 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -84,6 +84,13 @@ _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 +federation_known_servers_gauge = LaterGauge( + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + + @attr.s(frozen=True, slots=True, auto_attribs=True) class EventIdMembership: """Returned by `get_membership_from_event_ids`""" @@ -116,11 +123,8 @@ def __init__( 1, self._count_known_servers, ) - LaterGauge( - name="synapse_federation_known_servers", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self._known_servers_count}, + federation_known_servers_gauge.register_hook( + lambda: {(self.server_name,): self._known_servers_count} ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f5e592d80ee..b3c65676c6d 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -131,27 +131,31 @@ def _get_counts_from_rate_limiter_instance( # We track the number of affected hosts per time-period so we can # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. -LaterGauge( +sleep_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_sleep_affected_hosts", desc="Number of hosts that had requests put to sleep", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +sleep_affected_hosts_gauge.register_hook( + lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ), + ) ) -LaterGauge( +reject_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_reject_affected_hosts", desc="Number of hosts that had requests rejected", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +reject_affected_hosts_gauge.register_hook( + lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ), + ) ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index fdcacdf1289..904f99fa426 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -44,6 +44,13 @@ logger = logging.getLogger(__name__) +running_tasks_gauge = LaterGauge( + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], +) + + class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, @@ -130,11 +137,8 @@ def __init__(self, hs: "HomeServer"): TaskScheduler.SCHEDULE_INTERVAL_MS, ) - LaterGauge( - name="synapse_scheduler_running_tasks", - desc="The number of concurrent running tasks handled by the TaskScheduler", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._running_tasks)}, + running_tasks_gauge.register_hook( + lambda: {(self.server_name,): len(self._running_tasks)} ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 61874564a6b..5a3c3c1c4ed 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -22,7 +22,13 @@ from prometheus_client.core import Sample -from synapse.metrics import REGISTRY, InFlightGauge, generate_latest +from synapse.metrics import ( + REGISTRY, + SERVER_NAME_LABEL, + InFlightGauge, + LaterGauge, + generate_latest, +) from synapse.util.caches.deferred_cache import DeferredCache from tests import unittest @@ -285,6 +291,42 @@ def test_cache_metric_multiple_servers(self) -> None: self.assertEqual(hs2_cache_max_size_metric_value, "777.0") +class LaterGaugeTests(unittest.HomeserverTestCase): + def test_later_gauge_multiple_servers(self) -> None: + """ + Test that LaterGauge metrics are reported correctly across multiple servers. We + will have an metrics entry for each homeserver that is labeled with the + `server_name` label. + """ + later_gauge = LaterGauge( + name="foo", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + later_gauge.register_hook(lambda: {("hs1",): 1}) + later_gauge.register_hook(lambda: {("hs2",): 2}) + + metrics_map = get_latest_metrics() + + # Find the metrics for the caches from both homeservers + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNotNone( + hs1_metric_value, + f"Missing metric {hs1_metric} in cache metrics {metrics_map}", + ) + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in cache metrics {metrics_map}", + ) + + # Sanity check the metric values + self.assertEqual(hs1_metric_value, "1.0") + self.assertEqual(hs2_metric_value, "2.0") + + def get_latest_metrics() -> Dict[str, str]: """ Collect the latest metrics from the registry and parse them into an easy to use map.