Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
aea3f19
Fill in `synapse/notifier.py`
MadLittleMods Jul 22, 2025
26a9c88
Fill in `synapse/federation/send_queue.py`
MadLittleMods Jul 22, 2025
27e5893
Fill in `synapse/federation/sender/__init__.py`
MadLittleMods Jul 22, 2025
9e8ce08
Fill in `synapse/handlers/presence.py`
MadLittleMods Jul 22, 2025
ce3328b
Fill in `synapse/replication/tcp/handler.py`
MadLittleMods Jul 22, 2025
ea362d0
Fill in `synapse/storage/database.py`
MadLittleMods Jul 22, 2025
6836a66
Fill in `synapse/storage/databases/main/roommember.py`
MadLittleMods Jul 22, 2025
67d0f99
Fill in `synapse/util/task_scheduler.py`
MadLittleMods Jul 22, 2025
02aed32
Fix `synapse/federation/send_queue.py`
MadLittleMods Jul 22, 2025
a3a1146
Fix `synapse/notifier.py`
MadLittleMods Jul 22, 2025
9b83e99
Fill in keyword args for global `LaterGauge` (no server name label yet)
MadLittleMods Jul 22, 2025
e0db242
Merge branch 'develop' into madlittlemods/18592-later-gauge
MadLittleMods Jul 24, 2025
18345dc
Merge branch 'develop' into madlittlemods/18592-later-gauge
MadLittleMods Jul 25, 2025
4917fed
Rename `labelnames` arg to be consistent with other metrics
MadLittleMods Jul 25, 2025
739c942
Fill in `synapse/http/request_metrics.py`
MadLittleMods Jul 25, 2025
815bf60
Rename arg from `labels` -> `labelnames` to match other metrics
MadLittleMods Jul 25, 2025
762909f
Fill in `synapse/util/ratelimitutils.py`
MadLittleMods Jul 25, 2025
e12e3c3
Fill in `synapse/replication/tcp/protocol.py`
MadLittleMods Jul 25, 2025
5c00ccb
Add changelog
MadLittleMods Jul 29, 2025
90fbdee
Merge branch 'develop' into madlittlemods/18592-later-gauge
MadLittleMods Jul 29, 2025
291c3d3
Merge branch 'develop' into madlittlemods/18592-later-gauge
MadLittleMods Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18714.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `LaterGauge` metrics to be homeserver-scoped.
10 changes: 5 additions & 5 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -113,10 +113,10 @@ def __init__(self, hs: "HomeServer"):
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
[],
lambda: len(queue),
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
)

for queue_name in [
Expand Down
46 changes: 26 additions & 20 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,31 +399,37 @@ def __init__(self, hs: "HomeServer"):
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}

LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
),
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
},
)

LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
),
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
),
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
},
)

self._is_processing = False
Expand Down
16 changes: 8 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,10 +780,10 @@ def __init__(self, hs: "HomeServer"):
)

LaterGauge(
"synapse_handlers_presence_user_to_current_state_size",
"",
[],
lambda: len(self.user_to_current_state),
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
)

# The per-device presence state, maps user to devices to per-device presence state.
Expand Down Expand Up @@ -883,10 +883,10 @@ def __init__(self, hs: "HomeServer"):
)

LaterGauge(
"synapse_handlers_presence_wheel_timer_size",
"",
[],
lambda: len(self.wheel_timer),
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
)

# Used to handle sending of presence to newly joined users/servers
Expand Down
22 changes: 13 additions & 9 deletions synapse/http/request_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,31 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
with _in_flight_requests_lock:
reqs = list(_in_flight_requests)
request_metrics = list(_in_flight_requests)

for rm in reqs:
rm.update_metrics()
for request_metric in request_metrics:
request_metric.update_metrics()

# Map from (method, name) -> int, the number of in flight requests of that
# type. The key type is Tuple[str, str], but we leave the length unspecified
# for compatability with LaterGauge's annotations.
counts: Dict[Tuple[str, ...], int] = {}
for rm in reqs:
key = (rm.method, rm.name)
for request_metric in request_metrics:
key = (
request_metric.method,
request_metric.name,
request_metric.our_server_name,
)
counts[key] = counts.get(key, 0) + 1

return counts


LaterGauge(
"synapse_http_server_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)


Expand Down
6 changes: 3 additions & 3 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,21 @@ def collect() -> Iterable[Metric]:
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)


@attr.s(slots=True, hash=True, auto_attribs=True)
@attr.s(slots=True, hash=True, auto_attribs=True, kw_only=True)
class LaterGauge(Collector):
"""A Gauge which periodically calls a user-provided callback to produce metrics."""

name: str
desc: str
labels: Optional[StrSequence] = attr.ib(hash=False)
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]

def collect(self) -> Iterable[Metric]:
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames)

try:
calls = self.caller()
Expand Down
36 changes: 28 additions & 8 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Iterable,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -263,26 +264,45 @@ def __init__(self, hs: "HomeServer"):
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
def count_listeners() -> int:
#
# Ideally, we'd use `Mapping[Tuple[str], int]` here but mypy doesn't like it.
# This is close enough and better than a type ignore.
def count_listeners() -> Mapping[Tuple[str, ...], int]:
all_user_streams: Set[_NotifierUserStream] = set()

for streams in list(self.room_to_user_streams.values()):
all_user_streams |= streams
for stream in list(self.user_to_user_stream.values()):
all_user_streams.add(stream)

return sum(stream.count_listeners() for stream in all_user_streams)
return {
(self.server_name,): sum(
stream.count_listeners() for stream in all_user_streams
)
}

LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)

LaterGauge(
"synapse_notifier_rooms",
"",
[],
lambda: count(bool, list(self.room_to_user_streams.values())),
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
)
LaterGauge(
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)

def add_replication_callback(self, cb: Callable[[], None]) -> None:
Expand Down
18 changes: 9 additions & 9 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ def __init__(self, hs: "HomeServer"):
self._connections: List[IReplicationConnection] = []

LaterGauge(
"synapse_replication_tcp_resource_total_connections",
"",
[],
lambda: len(self._connections),
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
)

# When POSITION or RDATA commands arrive, we stick them in a queue and process
Expand All @@ -267,11 +267,11 @@ def __init__(self, hs: "HomeServer"):
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}

LaterGauge(
"synapse_replication_tcp_command_queue",
"Number of inbound RDATA/POSITION commands queued for processing",
["stream_name"],
lambda: {
(stream_name,): len(queue)
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},
)
Expand Down
40 changes: 22 additions & 18 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,12 @@ def replicate(self) -> None:
# The following simply registers metrics for the replication connections

pending_commands = LaterGauge(
"synapse_replication_tcp_protocol_pending_commands",
"",
["name"],
lambda: {(p.name,): len(p.pending_commands) for p in connected_connections},
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
},
)


Expand All @@ -539,10 +541,12 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int:


transport_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_send_buffer",
"",
["name"],
lambda: {(p.name,): transport_buffer_size(p) for p in connected_connections},
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
},
)


Expand All @@ -564,22 +568,22 @@ def transport_kernel_read_buffer_size(


tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
"",
["name"],
lambda: {
(p.name,): transport_kernel_read_buffer_size(p, False)
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
)


tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
"",
["name"],
lambda: {
(p.name,): transport_kernel_read_buffer_size(p, True)
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
)
8 changes: 4 additions & 4 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,10 @@ def __init__(

self.updates = BackgroundUpdater(hs, self)
LaterGauge(
"synapse_background_update_status",
"Background update status",
[],
self.updates.get_status,
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)

self._previous_txn_total_time = 0.0
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.logging.opentracing import trace
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
Expand Down Expand Up @@ -117,10 +117,10 @@ def __init__(
self._count_known_servers,
)
LaterGauge(
"synapse_federation_known_servers",
"",
[],
lambda: self._known_servers_count,
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
)

@wrap_as_background_process("_count_known_servers")
Expand Down
Loading
Loading