From c012907655bd891d26ca6f01f0676682b4f40117 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 6 Aug 2025 16:40:15 -0500 Subject: [PATCH] Revert "Fix `LaterGauge` metrics to collect from all servers (#18751)" This reverts commit 076db0ab49976edf82f696dcad485abb8614fe7d. --- changelog.d/18751.misc | 1 - synapse/federation/send_queue.py | 42 +++++------- synapse/federation/sender/__init__.py | 46 ++++++------- synapse/handlers/presence.py | 26 +++----- synapse/http/request_metrics.py | 4 +- synapse/metrics/__init__.py | 68 +++++++++++--------- synapse/notifier.py | 42 ++++++------ synapse/replication/tcp/handler.py | 28 ++++---- synapse/replication/tcp/protocol.py | 24 +++---- synapse/storage/database.py | 13 ++-- synapse/storage/databases/main/roommember.py | 14 ++-- synapse/util/ratelimitutils.py | 16 ++--- synapse/util/task_scheduler.py | 14 ++-- tests/metrics/test_metrics.py | 44 +------------ 14 files changed, 141 insertions(+), 241 deletions(-) delete mode 100644 changelog.d/18751.misc diff --git a/changelog.d/18751.misc b/changelog.d/18751.misc deleted file mode 100644 index 6ecd4982867..00000000000 --- a/changelog.d/18751.misc +++ /dev/null @@ -1 +0,0 @@ -Fix `LaterGauge` metrics to collect from all servers. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 1e9722e0d41..7f511d570c0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,7 +37,6 @@ """ import logging -from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -68,25 +67,6 @@ logger = logging.getLogger(__name__) -class QueueNames(str, Enum): - PRESENCE_MAP = "presence_map" - KEYED_EDU = "keyed_edu" - KEYED_EDU_CHANGED = "keyed_edu_changed" - EDUS = "edus" - POS_TIME = "pos_time" - PRESENCE_DESTINATIONS = "presence_destinations" - - -queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {} - -for queue_name in QueueNames: - queue_name_to_gauge_map[queue_name] = LaterGauge( - name=f"synapse_federation_send_queue_{queue_name.value}_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - ) - - class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" @@ -131,15 +111,23 @@ def __init__(self, hs: "HomeServer"): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(queue_name: QueueNames, queue: Sized) -> None: - queue_name_to_gauge_map[queue_name].register_hook( - lambda: {(self.server_name,): len(queue)} + def register(name: str, queue: Sized) -> None: + LaterGauge( + name="synapse_federation_send_queue_%s_size" % (queue_name,), + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(queue)}, ) - for queue_name in QueueNames: - queue = getattr(self, queue_name.value) - assert isinstance(queue, Sized) - register(queue_name, queue=queue) + for queue_name in [ + "presence_map", + "keyed_edu", + "keyed_edu_changed", + "edus", + "pos_time", + "presence_destinations", + ]: + register(queue_name, getattr(self, queue_name)) self.clock.looping_call(self._clear_queue, 30 * 1000) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 21af1235432..8befbe37222 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -199,24 +199,6 @@ labelnames=[SERVER_NAME_LABEL], ) -transaction_queue_pending_destinations_gauge = LaterGauge( - name="synapse_federation_transaction_queue_pending_destinations", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -transaction_queue_pending_pdus_gauge = LaterGauge( - name="synapse_federation_transaction_queue_pending_pdus", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -transaction_queue_pending_edus_gauge = LaterGauge( - name="synapse_federation_transaction_queue_pending_edus", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - # Time (in s) to wait before trying to wake up destinations that have # catch-up outstanding. # Please note that rate limiting still applies, so while the loop is @@ -416,28 +398,38 @@ def __init__(self, hs: "HomeServer"): # map from destination to PerDestinationQueue self._per_destination_queues: Dict[str, PerDestinationQueue] = {} - transaction_queue_pending_destinations_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() if d.transmission_loop_running ) - } + }, ) - transaction_queue_pending_pdus_gauge.register_hook( - lambda: { + + LaterGauge( + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) - } + }, ) - transaction_queue_pending_edus_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) - } + }, ) self._is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fb9f962672f..b253117498a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -173,18 +173,6 @@ labelnames=["locality", "from", "to", SERVER_NAME_LABEL], ) -presence_user_to_current_state_size_gauge = LaterGauge( - name="synapse_handlers_presence_user_to_current_state_size", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -presence_wheel_timer_size_gauge = LaterGauge( - name="synapse_handlers_presence_wheel_timer_size", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -791,8 +779,11 @@ def __init__(self, hs: "HomeServer"): EduTypes.PRESENCE, self.incoming_presence ) - presence_user_to_current_state_size_gauge.register_hook( - lambda: {(self.server_name,): len(self.user_to_current_state)} + LaterGauge( + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) # The per-device presence state, maps user to devices to per-device presence state. @@ -891,8 +882,11 @@ def __init__(self, hs: "HomeServer"): 60 * 1000, ) - presence_wheel_timer_size_gauge.register_hook( - lambda: {(self.server_name,): len(self.wheel_timer)} + LaterGauge( + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.wheel_timer)}, ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index c5274c758b9..a9b049f9043 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: return counts -in_flight_requests = LaterGauge( +LaterGauge( name="synapse_http_server_in_flight_requests_count", desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], + caller=_get_in_flight_counts, ) -in_flight_requests.register_hook(_get_in_flight_counts) class RequestMetrics: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 8c99d3c7700..11e2551a168 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -31,7 +31,6 @@ Dict, Generic, Iterable, - List, Mapping, Optional, Sequence, @@ -74,6 +73,8 @@ METRICS_PREFIX = "/_synapse/metrics" +all_gauges: Dict[str, Collector] = {} + HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") SERVER_NAME_LABEL = "server_name" @@ -162,47 +163,42 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # List of callbacks: each callback should either return a value (if there are no - # labels for this metric), or dict mapping from a label tuple to a value - _hooks: List[ - Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] - ] = attr.ib(factory=list, hash=False) + # callback: should either return a value (if there are no labels for this metric), + # or dict mapping from a label tuple to a value + caller: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ] def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - for hook in self._hooks: - try: - hook_result = hook() - except Exception: - logger.exception( - "Exception running callback for LaterGauge(%s)", self.name - ) - yield g - return - - if isinstance(hook_result, (int, float)): - g.add_metric([], hook_result) - else: - for k, v in hook_result.items(): - g.add_metric(k, v) - + try: + calls = self.caller() + except Exception: + logger.exception("Exception running callback for LaterGauge(%s)", self.name) yield g + return - def register_hook( - self, - hook: Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ], - ) -> None: - self._hooks.append(hook) + if isinstance(calls, (int, float)): + g.add_metric([], calls) + else: + for k, v in calls.items(): + g.add_metric(k, v) + + yield g def __attrs_post_init__(self) -> None: + self._register() + + def _register(self) -> None: + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering", self.name) + REGISTRY.unregister(all_gauges.pop(self.name)) + REGISTRY.register(self) + all_gauges[self.name] = self # `MetricsEntry` only makes sense when it is a `Protocol`, @@ -254,7 +250,7 @@ def __init__( # Protects access to _registrations self._lock = threading.Lock() - REGISTRY.register(self) + self._register_with_collector() def register( self, @@ -345,6 +341,14 @@ def collect(self) -> Iterable[Metric]: gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge + def _register_with_collector(self) -> None: + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering", self.name) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): """ diff --git a/synapse/notifier.py b/synapse/notifier.py index d56a7b26bbb..448a715e2a8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -86,24 +86,6 @@ labelnames=["stream", SERVER_NAME_LABEL], ) - -notifier_listeners_gauge = LaterGauge( - name="synapse_notifier_listeners", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -notifier_rooms_gauge = LaterGauge( - name="synapse_notifier_rooms", - desc="", - labelnames=[SERVER_NAME_LABEL], -) -notifier_users_gauge = LaterGauge( - name="synapse_notifier_users", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - T = TypeVar("T") @@ -299,16 +281,28 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]: ) } - notifier_listeners_gauge.register_hook(count_listeners) - notifier_rooms_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=count_listeners, + ) + + LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) - } + }, ) - notifier_users_gauge.register_hook( - lambda: {(self.server_name,): len(self.user_to_user_stream)} + LaterGauge( + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index f033eaaeb57..0f14c7e3804 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -106,18 +106,6 @@ "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] ) -tcp_resource_total_connections_gauge = LaterGauge( - name="synapse_replication_tcp_resource_total_connections", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - -tcp_command_queue_gauge = LaterGauge( - name="synapse_replication_tcp_command_queue", - desc="Number of inbound RDATA/POSITION commands queued for processing", - labelnames=["stream_name", SERVER_NAME_LABEL], -) - # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ @@ -255,8 +243,11 @@ def __init__(self, hs: "HomeServer"): # outgoing replication commands to.) self._connections: List[IReplicationConnection] = [] - tcp_resource_total_connections_gauge.register_hook( - lambda: {(self.server_name,): len(self._connections)} + LaterGauge( + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self._connections)}, ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -275,11 +266,14 @@ def __init__(self, hs: "HomeServer"): # from that connection. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} - tcp_command_queue_gauge.register_hook( - lambda: { + LaterGauge( + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], + caller=lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() - } + }, ) self._is_master = hs.config.worker.worker_app is None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 4d8381646aa..969f0303e05 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -527,11 +527,9 @@ def replicate(self) -> None: name="synapse_replication_tcp_protocol_pending_commands", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -pending_commands.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections - } + }, ) @@ -546,11 +544,9 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: name="synapse_replication_tcp_protocol_transport_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -transport_send_buffer.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections - } + }, ) @@ -575,12 +571,10 @@ def transport_kernel_read_buffer_size( name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -tcp_transport_kernel_send_buffer.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections - } + }, ) @@ -588,10 +582,8 @@ def transport_kernel_read_buffer_size( name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], -) -tcp_transport_kernel_read_buffer.register_hook( - lambda: { + caller=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections - } + }, ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index bbdc5b9d278..f7aec16c969 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -100,12 +100,6 @@ labelnames=["desc", SERVER_NAME_LABEL], ) -background_update_status = LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], -) - # Unique indexes which have been added in background updates. Maps from table name # to the name of the background update which added the unique index to that table. @@ -617,8 +611,11 @@ def __init__( ) self.updates = BackgroundUpdater(hs, self) - background_update_status.register_hook( - lambda: {(self.server_name,): self.updates.get_status()}, + LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): self.updates.get_status()}, ) self._previous_txn_total_time = 0.0 diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 94a1274edb0..654250fadc5 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -84,13 +84,6 @@ _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 -federation_known_servers_gauge = LaterGauge( - name="synapse_federation_known_servers", - desc="", - labelnames=[SERVER_NAME_LABEL], -) - - @attr.s(frozen=True, slots=True, auto_attribs=True) class EventIdMembership: """Returned by `get_membership_from_event_ids`""" @@ -123,8 +116,11 @@ def __init__( 1, self._count_known_servers, ) - federation_known_servers_gauge.register_hook( - lambda: {(self.server_name,): self._known_servers_count} + LaterGauge( + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): self._known_servers_count}, ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index b3c65676c6d..f5e592d80ee 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance( # We track the number of affected hosts per time-period so we can # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. -sleep_affected_hosts_gauge = LaterGauge( +LaterGauge( name="synapse_rate_limit_sleep_affected_hosts", desc="Number of hosts that had requests put to sleep", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], -) -sleep_affected_hosts_gauge.register_hook( - lambda: _get_counts_from_rate_limiter_instance( + caller=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ) + ), ) -reject_affected_hosts_gauge = LaterGauge( +LaterGauge( name="synapse_rate_limit_reject_affected_hosts", desc="Number of hosts that had requests rejected", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], -) -reject_affected_hosts_gauge.register_hook( - lambda: _get_counts_from_rate_limiter_instance( + caller=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ) + ), ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 904f99fa426..fdcacdf1289 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -44,13 +44,6 @@ logger = logging.getLogger(__name__) -running_tasks_gauge = LaterGauge( - name="synapse_scheduler_running_tasks", - desc="The number of concurrent running tasks handled by the TaskScheduler", - labelnames=[SERVER_NAME_LABEL], -) - - class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, @@ -137,8 +130,11 @@ def __init__(self, hs: "HomeServer"): TaskScheduler.SCHEDULE_INTERVAL_MS, ) - running_tasks_gauge.register_hook( - lambda: {(self.server_name,): len(self._running_tasks)} + LaterGauge( + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self._running_tasks)}, ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 5a3c3c1c4ed..61874564a6b 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -22,13 +22,7 @@ from prometheus_client.core import Sample -from synapse.metrics import ( - REGISTRY, - SERVER_NAME_LABEL, - InFlightGauge, - LaterGauge, - generate_latest, -) +from synapse.metrics import REGISTRY, InFlightGauge, generate_latest from synapse.util.caches.deferred_cache import DeferredCache from tests import unittest @@ -291,42 +285,6 @@ def test_cache_metric_multiple_servers(self) -> None: self.assertEqual(hs2_cache_max_size_metric_value, "777.0") -class LaterGaugeTests(unittest.HomeserverTestCase): - def test_later_gauge_multiple_servers(self) -> None: - """ - Test that LaterGauge metrics are reported correctly across multiple servers. We - will have an metrics entry for each homeserver that is labeled with the - `server_name` label. - """ - later_gauge = LaterGauge( - name="foo", - desc="", - labelnames=[SERVER_NAME_LABEL], - ) - later_gauge.register_hook(lambda: {("hs1",): 1}) - later_gauge.register_hook(lambda: {("hs2",): 2}) - - metrics_map = get_latest_metrics() - - # Find the metrics for the caches from both homeservers - hs1_metric = 'foo{server_name="hs1"}' - hs1_metric_value = metrics_map.get(hs1_metric) - self.assertIsNotNone( - hs1_metric_value, - f"Missing metric {hs1_metric} in cache metrics {metrics_map}", - ) - hs2_metric = 'foo{server_name="hs2"}' - hs2_metric_value = metrics_map.get(hs2_metric) - self.assertIsNotNone( - hs2_metric_value, - f"Missing metric {hs2_metric} in cache metrics {metrics_map}", - ) - - # Sanity check the metric values - self.assertEqual(hs1_metric_value, "1.0") - self.assertEqual(hs2_metric_value, "2.0") - - def get_latest_metrics() -> Dict[str, str]: """ Collect the latest metrics from the registry and parse them into an easy to use map.