Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion changelog.d/18751.misc

This file was deleted.

42 changes: 15 additions & 27 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"""

import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
Expand Down Expand Up @@ -68,25 +67,6 @@
logger = logging.getLogger(__name__)


class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"


queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}

for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)


class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""

Expand Down Expand Up @@ -131,15 +111,23 @@ def __init__(self, hs: "HomeServer"):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
lambda: {(self.server_name,): len(queue)}
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
)

for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))

self.clock.looping_call(self._clear_queue, 30 * 1000)

Expand Down
46 changes: 19 additions & 27 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,24 +199,6 @@
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
Expand Down Expand Up @@ -416,28 +398,38 @@ def __init__(self, hs: "HomeServer"):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}

transaction_queue_pending_destinations_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
}
},
)
transaction_queue_pending_pdus_gauge.register_hook(
lambda: {

LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
}
},
)
transaction_queue_pending_edus_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
}
},
)

self._is_processing = False
Expand Down
26 changes: 10 additions & 16 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,6 @@
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)

presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
Expand Down Expand Up @@ -791,8 +779,11 @@ def __init__(self, hs: "HomeServer"):
EduTypes.PRESENCE, self.incoming_presence
)

presence_user_to_current_state_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_current_state)}
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
)

# The per-device presence state, maps user to devices to per-device presence state.
Expand Down Expand Up @@ -891,8 +882,11 @@ def __init__(self, hs: "HomeServer"):
60 * 1000,
)

presence_wheel_timer_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.wheel_timer)}
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
)

# Used to handle sending of presence to newly joined users/servers
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/request_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts


in_flight_requests = LaterGauge(
LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(_get_in_flight_counts)


class RequestMetrics:
Expand Down
68 changes: 36 additions & 32 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -74,6 +73,8 @@

METRICS_PREFIX = "/_synapse/metrics"

all_gauges: Dict[str, Collector] = {}

HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")

SERVER_NAME_LABEL = "server_name"
Expand Down Expand Up @@ -162,47 +163,42 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# List of callbacks: each callback should either return a value (if there are no
# labels for this metric), or dict mapping from a label tuple to a value
_hooks: List[
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]

def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]

for hook in self._hooks:
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return

if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)

try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return

def register_hook(
self,
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
self._hooks.append(hook)
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)

yield g

def __attrs_post_init__(self) -> None:
self._register()

def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


# `MetricsEntry` only makes sense when it is a `Protocol`,
Expand Down Expand Up @@ -254,7 +250,7 @@ def __init__(
# Protects access to _registrations
self._lock = threading.Lock()

REGISTRY.register(self)
self._register_with_collector()

def register(
self,
Expand Down Expand Up @@ -345,6 +341,14 @@ def collect(self) -> Iterable[Metric]:
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge

def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
Expand Down
42 changes: 18 additions & 24 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,6 @@
labelnames=["stream", SERVER_NAME_LABEL],
)


notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

T = TypeVar("T")


Expand Down Expand Up @@ -299,16 +281,28 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]:
)
}

notifier_listeners_gauge.register_hook(count_listeners)
notifier_rooms_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)

LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
}
},
)
notifier_users_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_user_stream)}
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)

def add_replication_callback(self, cb: Callable[[], None]) -> None:
Expand Down
Loading
Loading