Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9efcd2d

Browse files
committed
Expose statistics on extrems to prometheus (#5384)
2 parents 8ea4191 + 6312d6c commit 9efcd2d

File tree

7 files changed

+331
-114
lines changed

7 files changed

+331
-114
lines changed

changelog.d/5384.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Statistics on forward extremities per room are now exposed via Prometheus.

scripts/generate_signing_key.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import argparse
1717
import sys
1818

19-
from signedjson.key import write_signing_keys, generate_signing_key
19+
from signedjson.key import generate_signing_key, write_signing_keys
2020

2121
from synapse.util.stringutils import random_string
2222

synapse/metrics/__init__.py

+92-20
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
import attr
2727
from prometheus_client import Counter, Gauge, Histogram
28-
from prometheus_client.core import REGISTRY, GaugeMetricFamily
28+
from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
2929

3030
from twisted.internet import reactor
3131

@@ -40,7 +40,6 @@
4040

4141

4242
class RegistryProxy(object):
43-
4443
@staticmethod
4544
def collect():
4645
for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ def collect(self):
6362
try:
6463
calls = self.caller()
6564
except Exception:
66-
logger.exception(
67-
"Exception running callback for LaterGauge(%s)",
68-
self.name,
69-
)
65+
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
7066
yield g
7167
return
7268

@@ -116,9 +112,7 @@ def __init__(self, name, desc, labels, sub_metrics):
116112
# Create a class which have the sub_metrics values as attributes, which
117113
# default to 0 on initialization. Used to pass to registered callbacks.
118114
self._metrics_class = attr.make_class(
119-
"_MetricsEntry",
120-
attrs={x: attr.ib(0) for x in sub_metrics},
121-
slots=True,
115+
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
122116
)
123117

124118
# Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ def collect(self):
157151
158152
Note: may be called by a separate thread.
159153
"""
160-
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
154+
in_flight = GaugeMetricFamily(
155+
self.name + "_total", self.desc, labels=self.labels
156+
)
161157

162158
metrics_by_key = {}
163159

@@ -179,7 +175,9 @@ def collect(self):
179175
yield in_flight
180176

181177
for name in self.sub_metrics:
182-
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
178+
gauge = GaugeMetricFamily(
179+
"_".join([self.name, name]), "", labels=self.labels
180+
)
183181
for key, metrics in six.iteritems(metrics_by_key):
184182
gauge.add_metric(key, getattr(metrics, name))
185183
yield gauge
@@ -193,12 +191,75 @@ def _register_with_collector(self):
193191
all_gauges[self.name] = self
194192

195193

194+
@attr.s(hash=True)
195+
class BucketCollector(object):
196+
"""
197+
Like a Histogram, but allows buckets to be point-in-time instead of
198+
incrementally added to.
199+
200+
Args:
201+
name (str): Base name of metric to be exported to Prometheus.
202+
data_collector (callable -> dict): A synchronous callable that
203+
returns a dict mapping bucket to number of items in the
204+
bucket. If these buckets are not the same as the buckets
205+
given to this class, they will be remapped into them.
206+
buckets (list[float]): List of floats/ints of the buckets to
207+
give to Prometheus. +Inf is ignored, if given.
208+
209+
"""
210+
211+
name = attr.ib()
212+
data_collector = attr.ib()
213+
buckets = attr.ib()
214+
215+
def collect(self):
216+
217+
# Fetch the data -- this must be synchronous!
218+
data = self.data_collector()
219+
220+
buckets = {}
221+
222+
res = []
223+
for x in data.keys():
224+
for i, bound in enumerate(self.buckets):
225+
if x <= bound:
226+
buckets[bound] = buckets.get(bound, 0) + data[x]
227+
break
228+
229+
for i in self.buckets:
230+
res.append([i, buckets.get(i, 0)])
231+
232+
res.append(["+Inf", sum(data.values())])
233+
234+
metric = HistogramMetricFamily(
235+
self.name,
236+
"",
237+
buckets=res,
238+
sum_value=sum([x * y for x, y in data.items()]),
239+
)
240+
yield metric
241+
242+
def __attrs_post_init__(self):
243+
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
244+
if self.buckets != sorted(self.buckets):
245+
raise ValueError("Buckets not sorted")
246+
247+
self.buckets = tuple(self.buckets)
248+
249+
if self.name in all_gauges.keys():
250+
logger.warning("%s already registered, reregistering" % (self.name,))
251+
REGISTRY.unregister(all_gauges.pop(self.name))
252+
253+
REGISTRY.register(self)
254+
all_gauges[self.name] = self
255+
256+
196257
#
197258
# Detailed CPU metrics
198259
#
199260

200-
class CPUMetrics(object):
201261

262+
class CPUMetrics(object):
202263
def __init__(self):
203264
ticks_per_sec = 100
204265
try:
@@ -237,13 +298,28 @@ def collect(self):
237298
"python_gc_time",
238299
"Time taken to GC (sec)",
239300
["gen"],
240-
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
241-
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
301+
buckets=[
302+
0.0025,
303+
0.005,
304+
0.01,
305+
0.025,
306+
0.05,
307+
0.10,
308+
0.25,
309+
0.50,
310+
1.00,
311+
2.50,
312+
5.00,
313+
7.50,
314+
15.00,
315+
30.00,
316+
45.00,
317+
60.00,
318+
],
242319
)
243320

244321

245322
class GCCounts(object):
246-
247323
def collect(self):
248324
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
249325
for n, m in enumerate(gc.get_count()):
@@ -279,9 +355,7 @@ def collect(self):
279355
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
280356

281357
event_processing_loop_counter = Counter(
282-
"synapse_event_processing_loop_count",
283-
"Event processing loop iterations",
284-
["name"],
358+
"synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
285359
)
286360

287361
event_processing_loop_room_count = Counter(
@@ -311,7 +385,6 @@ def collect(self):
311385

312386

313387
class ReactorLastSeenMetric(object):
314-
315388
def collect(self):
316389
cm = GaugeMetricFamily(
317390
"python_twisted_reactor_last_seen",
@@ -325,7 +398,6 @@ def collect(self):
325398

326399

327400
def runUntilCurrentTimer(func):
328-
329401
@functools.wraps(func)
330402
def f(*args, **kwargs):
331403
now = reactor.seconds()

synapse/storage/events.py

+31-13
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import itertools
1919
import logging
20-
from collections import OrderedDict, deque, namedtuple
20+
from collections import Counter as c_counter, OrderedDict, deque, namedtuple
2121
from functools import wraps
2222

2323
from six import iteritems, text_type
@@ -33,6 +33,7 @@
3333
from synapse.api.errors import SynapseError
3434
from synapse.events import EventBase # noqa: F401
3535
from synapse.events.snapshot import EventContext # noqa: F401
36+
from synapse.metrics import BucketCollector
3637
from synapse.metrics.background_process_metrics import run_as_background_process
3738
from synapse.state import StateResolutionStore
3839
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,38 @@ class EventsStore(
220221
EventsWorkerStore,
221222
BackgroundUpdateStore,
222223
):
223-
224224
def __init__(self, db_conn, hs):
225225
super(EventsStore, self).__init__(db_conn, hs)
226226

227227
self._event_persist_queue = _EventPeristenceQueue()
228228
self._state_resolution_handler = hs.get_state_resolution_handler()
229229

230+
# Collect metrics on the number of forward extremities that exist.
231+
self._current_forward_extremities_amount = {}
232+
233+
BucketCollector(
234+
"synapse_forward_extremities",
235+
lambda: self._current_forward_extremities_amount,
236+
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
237+
)
238+
239+
# Read the extrems every 60 minutes
240+
hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
241+
242+
@defer.inlineCallbacks
243+
def _read_forward_extremities(self):
244+
def fetch(txn):
245+
txn.execute(
246+
"""
247+
select count(*) c from event_forward_extremities
248+
group by room_id
249+
"""
250+
)
251+
return txn.fetchall()
252+
253+
res = yield self.runInteraction("read_forward_extremities", fetch)
254+
self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
255+
230256
@defer.inlineCallbacks
231257
def persist_events(self, events_and_contexts, backfilled=False):
232258
"""
@@ -568,17 +594,11 @@ def _get_events_which_are_prevs_txn(txn, batch):
568594
)
569595

570596
txn.execute(sql, batch)
571-
results.extend(
572-
r[0]
573-
for r in txn
574-
if not json.loads(r[1]).get("soft_failed")
575-
)
597+
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
576598

577599
for chunk in batch_iter(event_ids, 100):
578600
yield self.runInteraction(
579-
"_get_events_which_are_prevs",
580-
_get_events_which_are_prevs_txn,
581-
chunk,
601+
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
582602
)
583603

584604
defer.returnValue(results)
@@ -640,9 +660,7 @@ def _get_prevs_before_rejected_txn(txn, batch):
640660

641661
for chunk in batch_iter(event_ids, 100):
642662
yield self.runInteraction(
643-
"_get_prevs_before_rejected",
644-
_get_prevs_before_rejected_txn,
645-
chunk,
663+
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
646664
)
647665

648666
defer.returnValue(existing_prevs)

0 commit comments

Comments
 (0)