Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18751.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.
Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the changelog is already underway in https://github.com/element-hq/synapse/blob/release-v1.136/CHANGES.md, I'm going to leave this as-is as it will require manual tinkering anyway to pull into v1.136.0 and we don't need to re-run CI again.

The desire is for this changelog to merge with #18714

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I'm already manually tinkering away in there

42 changes: 27 additions & 15 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"""

import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
Expand Down Expand Up @@ -67,6 +68,25 @@
logger = logging.getLogger(__name__)


class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"


queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}

for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)


class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""

Expand Down Expand Up @@ -111,23 +131,15 @@ def __init__(self, hs: "HomeServer"):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
lambda: {(self.server_name,): len(queue)}
)

for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)

self.clock.looping_call(self._clear_queue, 30 * 1000)

Expand Down
46 changes: 27 additions & 19 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,24 @@
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
Expand Down Expand Up @@ -398,38 +416,28 @@ def __init__(self, hs: "HomeServer"):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}

LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_destinations_gauge.register_hook(
lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
},
}
)

LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_pdus_gauge.register_hook(
lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
}
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_edus_gauge.register_hook(
lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
},
}
)

self._is_processing = False
Expand Down
26 changes: 16 additions & 10 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)

presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
Expand Down Expand Up @@ -779,11 +791,8 @@ def __init__(self, hs: "HomeServer"):
EduTypes.PRESENCE, self.incoming_presence
)

LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
presence_user_to_current_state_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_current_state)}
)

# The per-device presence state, maps user to devices to per-device presence state.
Expand Down Expand Up @@ -882,11 +891,8 @@ def __init__(self, hs: "HomeServer"):
60 * 1000,
)

LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
presence_wheel_timer_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.wheel_timer)}
)

# Used to handle sending of presence to newly joined users/servers
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/request_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts


LaterGauge(
in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(_get_in_flight_counts)


class RequestMetrics:
Expand Down
68 changes: 32 additions & 36 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -73,8 +74,6 @@

METRICS_PREFIX = "/_synapse/metrics"

all_gauges: Dict[str, Collector] = {}

HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")

SERVER_NAME_LABEL = "server_name"
Expand Down Expand Up @@ -163,42 +162,47 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
# List of callbacks: each callback should either return a value (if there are no
# labels for this metric), or dict mapping from a label tuple to a value
_hooks: List[
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)

def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]

try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
for hook in self._hooks:
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return

if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)

if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
yield g

yield g
def register_hook(
self,
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
self._hooks.append(hook)

def __attrs_post_init__(self) -> None:
self._register()

def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self
Comment on lines -195 to -201
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh - that also solves it.

@erikjohnston it looks like you last touched this "reregistering" stuff back 7 years ago.
Do you know if it's safe to blow it away like this? Or is there a reason for it being here that I'm unaware of?

Logically, it makes sense to me that we shouldn't be reregistering the same metrics multiple times within Synapse. Anything that does must surely be a bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine. I wonder if we should explicitly error if we try and reregister the same metric name then?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REGISTRY.register(...) will complain about duplicate metrics already ⏩

Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our experience after merging this PR (see #18789 where this PR was reverted), I'm guessing this might have been added because we create many many homeservers in our trial tests (HomeserverTestCase). All of these metrics/hooks can stack up significantly enough to cause havoc in CI.



# `MetricsEntry` only makes sense when it is a `Protocol`,
Expand Down Expand Up @@ -250,7 +254,7 @@ def __init__(
# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()
REGISTRY.register(self)

def register(
self,
Expand Down Expand Up @@ -341,14 +345,6 @@ def collect(self) -> Iterable[Metric]:
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge

def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
Expand Down
42 changes: 24 additions & 18 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@
labelnames=["stream", SERVER_NAME_LABEL],
)


notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

T = TypeVar("T")


Expand Down Expand Up @@ -281,28 +299,16 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]:
)
}

LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)

LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
notifier_listeners_gauge.register_hook(count_listeners)
notifier_rooms_gauge.register_hook(
lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
}
)
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
notifier_users_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_user_stream)}
)

def add_replication_callback(self, cb: Callable[[], None]) -> None:
Expand Down
Loading
Loading