25
25
26
26
import attr
27
27
from prometheus_client import Counter , Gauge , Histogram
28
- from prometheus_client .core import REGISTRY , GaugeMetricFamily
28
+ from prometheus_client .core import REGISTRY , GaugeMetricFamily , HistogramMetricFamily
29
29
30
30
from twisted .internet import reactor
31
31
40
40
41
41
42
42
class RegistryProxy (object ):
43
-
44
43
@staticmethod
45
44
def collect ():
46
45
for metric in REGISTRY .collect ():
@@ -63,10 +62,7 @@ def collect(self):
63
62
try :
64
63
calls = self .caller ()
65
64
except Exception :
66
- logger .exception (
67
- "Exception running callback for LaterGauge(%s)" ,
68
- self .name ,
69
- )
65
+ logger .exception ("Exception running callback for LaterGauge(%s)" , self .name )
70
66
yield g
71
67
return
72
68
@@ -116,9 +112,7 @@ def __init__(self, name, desc, labels, sub_metrics):
116
112
# Create a class which have the sub_metrics values as attributes, which
117
113
# default to 0 on initialization. Used to pass to registered callbacks.
118
114
self ._metrics_class = attr .make_class (
119
- "_MetricsEntry" ,
120
- attrs = {x : attr .ib (0 ) for x in sub_metrics },
121
- slots = True ,
115
+ "_MetricsEntry" , attrs = {x : attr .ib (0 ) for x in sub_metrics }, slots = True
122
116
)
123
117
124
118
# Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ def collect(self):
157
151
158
152
Note: may be called by a separate thread.
159
153
"""
160
- in_flight = GaugeMetricFamily (self .name + "_total" , self .desc , labels = self .labels )
154
+ in_flight = GaugeMetricFamily (
155
+ self .name + "_total" , self .desc , labels = self .labels
156
+ )
161
157
162
158
metrics_by_key = {}
163
159
@@ -179,7 +175,9 @@ def collect(self):
179
175
yield in_flight
180
176
181
177
for name in self .sub_metrics :
182
- gauge = GaugeMetricFamily ("_" .join ([self .name , name ]), "" , labels = self .labels )
178
+ gauge = GaugeMetricFamily (
179
+ "_" .join ([self .name , name ]), "" , labels = self .labels
180
+ )
183
181
for key , metrics in six .iteritems (metrics_by_key ):
184
182
gauge .add_metric (key , getattr (metrics , name ))
185
183
yield gauge
@@ -193,12 +191,75 @@ def _register_with_collector(self):
193
191
all_gauges [self .name ] = self
194
192
195
193
194
+ @attr .s (hash = True )
195
+ class BucketCollector (object ):
196
+ """
197
+ Like a Histogram, but allows buckets to be point-in-time instead of
198
+ incrementally added to.
199
+
200
+ Args:
201
+ name (str): Base name of metric to be exported to Prometheus.
202
+ data_collector (callable -> dict): A synchronous callable that
203
+ returns a dict mapping bucket to number of items in the
204
+ bucket. If these buckets are not the same as the buckets
205
+ given to this class, they will be remapped into them.
206
+ buckets (list[float]): List of floats/ints of the buckets to
207
+ give to Prometheus. +Inf is ignored, if given.
208
+
209
+ """
210
+
211
+ name = attr .ib ()
212
+ data_collector = attr .ib ()
213
+ buckets = attr .ib ()
214
+
215
+ def collect (self ):
216
+
217
+ # Fetch the data -- this must be synchronous!
218
+ data = self .data_collector ()
219
+
220
+ buckets = {}
221
+
222
+ res = []
223
+ for x in data .keys ():
224
+ for i , bound in enumerate (self .buckets ):
225
+ if x <= bound :
226
+ buckets [bound ] = buckets .get (bound , 0 ) + data [x ]
227
+ break
228
+
229
+ for i in self .buckets :
230
+ res .append ([i , buckets .get (i , 0 )])
231
+
232
+ res .append (["+Inf" , sum (data .values ())])
233
+
234
+ metric = HistogramMetricFamily (
235
+ self .name ,
236
+ "" ,
237
+ buckets = res ,
238
+ sum_value = sum ([x * y for x , y in data .items ()]),
239
+ )
240
+ yield metric
241
+
242
+ def __attrs_post_init__ (self ):
243
+ self .buckets = [float (x ) for x in self .buckets if x != "+Inf" ]
244
+ if self .buckets != sorted (self .buckets ):
245
+ raise ValueError ("Buckets not sorted" )
246
+
247
+ self .buckets = tuple (self .buckets )
248
+
249
+ if self .name in all_gauges .keys ():
250
+ logger .warning ("%s already registered, reregistering" % (self .name ,))
251
+ REGISTRY .unregister (all_gauges .pop (self .name ))
252
+
253
+ REGISTRY .register (self )
254
+ all_gauges [self .name ] = self
255
+
256
+
196
257
#
197
258
# Detailed CPU metrics
198
259
#
199
260
200
- class CPUMetrics (object ):
201
261
262
+ class CPUMetrics (object ):
202
263
def __init__ (self ):
203
264
ticks_per_sec = 100
204
265
try :
@@ -237,13 +298,28 @@ def collect(self):
237
298
"python_gc_time" ,
238
299
"Time taken to GC (sec)" ,
239
300
["gen" ],
240
- buckets = [0.0025 , 0.005 , 0.01 , 0.025 , 0.05 , 0.10 , 0.25 , 0.50 , 1.00 , 2.50 ,
241
- 5.00 , 7.50 , 15.00 , 30.00 , 45.00 , 60.00 ],
301
+ buckets = [
302
+ 0.0025 ,
303
+ 0.005 ,
304
+ 0.01 ,
305
+ 0.025 ,
306
+ 0.05 ,
307
+ 0.10 ,
308
+ 0.25 ,
309
+ 0.50 ,
310
+ 1.00 ,
311
+ 2.50 ,
312
+ 5.00 ,
313
+ 7.50 ,
314
+ 15.00 ,
315
+ 30.00 ,
316
+ 45.00 ,
317
+ 60.00 ,
318
+ ],
242
319
)
243
320
244
321
245
322
class GCCounts (object ):
246
-
247
323
def collect (self ):
248
324
cm = GaugeMetricFamily ("python_gc_counts" , "GC object counts" , labels = ["gen" ])
249
325
for n , m in enumerate (gc .get_count ()):
@@ -279,9 +355,7 @@ def collect(self):
279
355
events_processed_counter = Counter ("synapse_federation_client_events_processed" , "" )
280
356
281
357
event_processing_loop_counter = Counter (
282
- "synapse_event_processing_loop_count" ,
283
- "Event processing loop iterations" ,
284
- ["name" ],
358
+ "synapse_event_processing_loop_count" , "Event processing loop iterations" , ["name" ]
285
359
)
286
360
287
361
event_processing_loop_room_count = Counter (
@@ -311,7 +385,6 @@ def collect(self):
311
385
312
386
313
387
class ReactorLastSeenMetric (object ):
314
-
315
388
def collect (self ):
316
389
cm = GaugeMetricFamily (
317
390
"python_twisted_reactor_last_seen" ,
@@ -325,7 +398,6 @@ def collect(self):
325
398
326
399
327
400
def runUntilCurrentTimer (func ):
328
-
329
401
@functools .wraps (func )
330
402
def f (* args , ** kwargs ):
331
403
now = reactor .seconds ()
0 commit comments