-
Notifications
You must be signed in to change notification settings - Fork 1
/
collectd_prometheus.py
162 lines (140 loc) · 6.04 KB
/
collectd_prometheus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
# Copyright 2019-2021 Ryar Nyah
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collectd
import requests
import signal
import json
import re
from prometheus_client.parser import text_string_to_metric_families
PLUGIN_NAME = 'prometheus'
DEFAULT_INTERVAL = 30
DEFAULT_TIMEOUT = 30
DEFAULT_PROTOCOL = 'http'
DEFAULT_URI = '/metrics'
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = '8080'
DEFAULT_PROCESS = 'unknown'
# Config options
KEY_PROCESS = 'Process'
KEY_PROTOCOL = 'Protocol'
KEY_INTERVAL = 'Interval'
KEY_PORT = 'Port'
KEY_HOST = 'Host'
KEY_URI = 'Uri'
KEY_REGEX_FILTER = 'Filter'
KEY_TIMEOUT = 'Timeout'
KEY_SSL_IGNORE = 'SslIgnore'
KEY_SSL_CERT = 'SslCert'
KEY_SSL_KEY = 'SslKey'
KEY_SSL_CACERT = 'SslCacert'
class PrometheusProcess(object):
"""
Plugin that parse prometheus metrics to collectd.
"""
def __init__(self):
self.process = DEFAULT_PROCESS
self.host = DEFAULT_HOST
self.port = DEFAULT_PORT
self.uri = DEFAULT_URI
self.protocol = DEFAULT_PROTOCOL
self.timeout = DEFAULT_TIMEOUT
self.regex_filters = []
self.ssl_ignore = None
self.ssl_cert = None
self.ssl_key = None
self.ssl_cacert = None
def config(self, cfg):
for children in cfg.children:
if children.key == KEY_PROCESS:
self.process = children.values[0]
if children.key == KEY_HOST:
self.host = children.values[0]
if children.key == KEY_PORT:
self.port = children.values[0]
if children.key == KEY_URI:
self.uri = children.values[0]
if children.key == KEY_PROTOCOL:
self.protocol = children.values[0]
if children.key == KEY_TIMEOUT:
self.timeout = int(children.values[0])
if children.key == KEY_SSL_IGNORE:
self.ssl_ignore = bool(children.values[0])
if self.ssl_ignore:
import urllib3
urllib3.disable_warnings()
if children.key == KEY_SSL_CERT:
self.ssl_cert = children.values[0]
if children.key == KEY_SSL_KEY:
self.ssl_key = children.values[0]
if children.key == KEY_SSL_CACERT:
self.ssl_cacert = children.values[0]
if children.key == KEY_REGEX_FILTER:
self.regex_filters.append(children.values[0])
collectd.debug(json.dumps(self.__dict__))
class Prometheus(object):
"""
Plugin that parse prometheus metrics to collectd.
"""
def __init__(self):
self.interval = DEFAULT_INTERVAL
self.process_monitored = []
def config(self, cfg):
for children in cfg.children:
if children.key == KEY_INTERVAL:
self.interval = children.values[0]
if children.key == KEY_PROCESS:
process_configuration = PrometheusProcess()
process_configuration.config(children)
self.process_monitored.append(process_configuration)
collectd.register_read(self.read, interval = self.interval)
def read(self):
for process in self.process_monitored:
kwargs = {}
if process.ssl_ignore is not None:
kwargs.update({'verify': not process.ssl_ignore})
if process.ssl_cert is not None and process.ssl_key is not None:
kwargs.update({'cert': (process.ssl_cert, process.ssl_key)})
if process.ssl_cacert is not None:
kwargs.update({'verify': process.ssl_cacert})
if process.timeout is not None:
kwargs.update({'timeout': process.timeout})
try:
metrics = requests.get("%s://%s:%s%s" % (process.protocol, process.host, process.port, process.uri), **kwargs).content.decode()
for family in text_string_to_metric_families(metrics):
for sample in family.samples:
# Normalize metric name
metric_name = re.sub(r'[^_a-zA-Z0-9]', '_', sample.name)
metric_name = re.sub(r'(?<!^)(?=[A-Z])', '_', metric_name).lower()
# Check if metric need to be dispatched
metric_match = False if len(process.regex_filters) > 0 else True
for metric_filter in process.regex_filters:
if re.compile(metric_filter).match(metric_name):
metric_match = True
if metric_match:
metric = collectd.Values()
metric.plugin = PLUGIN_NAME
metric.plugin_instance = process.process
metric.meta = sample.labels
metric.type = 'gauge'
metric.type_instance = metric_name
metric.values = [sample.value]
metric.dispatch()
collectd.debug("Name: {0} Labels: {1} Value: {2}".format(*sample))
else:
collectd.debug("Name: {0} not match Labels: {1} Value: {2}".format(*sample))
except Exception as e:
collectd.error('unable to get prometheus data %s://%s:%s%s with current configuration %s: %s' % (process.protocol, process.host, process.port, process.uri, json.dumps(process.__dict__), e))
def init():
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
prom = Prometheus()
collectd.register_init(init)
collectd.register_config(prom.config)