Skip to content

Commit

Permalink
NETOBSERV-1748: restart prom registry on metrics breaking change (#686)
Browse files Browse the repository at this point in the history
- Use dedicated prom registries per prom-encode stage
- restart registry when a "illegal" change is detected (change in
  labels), since this is not allowed by the prom client, even if
unregsitering the metric first
- There's also a small perf improvement by using utils.ConvertToString
  instead of fmt.Sprintf
  • Loading branch information
jotak committed Jul 31, 2024
1 parent 5729c2c commit b010e85
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 177 deletions.
2 changes: 1 addition & 1 deletion pkg/pipeline/aggregate_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ parameters:
for _, aa := range actualAggs {
promEncode.Encode(aa)
}
exposed := test.ReadExposedMetrics(t)
exposed := test.ReadExposedMetrics(t, promEncode.(*encode.EncodeProm).Gatherer())

for _, expected := range tt.expectedEncode {
require.Contains(t, exposed, expected)
Expand Down
238 changes: 117 additions & 121 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)

var plog = logrus.WithField("component", "encode.Prometheus")

const defaultExpiryTime = time.Duration(2 * time.Minute)

// nolint:revive
Expand All @@ -38,11 +40,17 @@ type EncodeProm struct {
registerer prometheus.Registerer
metricCommon *MetricsCommonStruct
updateChan chan config.StageParam
server *promserver.PromServer
regName string
}

func (e *EncodeProm) Gatherer() prometheus.Gatherer {
return e.server
}

// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode
func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)
plog.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)
e.metricCommon.MetricCommonEncode(e, metricRecord)
e.checkConfUpdate()
}
Expand Down Expand Up @@ -106,45 +114,34 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) {
func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
return counter
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) {
func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
return gauge
}
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) {

func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(histogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
return histogram
}
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) {

func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(agghistogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
return agghistogram
}

func (e *EncodeProm) unregisterMetric(c interface{}) {
if c, ok := c.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}

}

func (e *EncodeProm) cleanDeletedGeneric(newCfg api.PromEncode, metrics map[string]mInfoStruct) {
Expand Down Expand Up @@ -178,72 +175,115 @@ func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) {
e.cleanDeletedGeneric(newCfg, e.metricCommon.aggHistos)
}

// returns true if a registry restart is needed
func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *MetricInfo) prometheus.Collector) bool {
fullMetricName := prefix + apiItem.Name
plog.Debugf("Checking metric: %s", fullMetricName)
mInfo := CreateMetricInfo(apiItem)
if oldMetric, ok := store[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem.Labels, oldMetric.info.MetricsItem.Labels) {
plog.Debug("Changes detected in labels")
return true
}
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
plog.Debug("Changes detected: unregistering and replacing")
e.unregisterMetric(oldMetric.genericMetric)
c := createMetric(fullMetricName, mInfo)
err := e.registerer.Register(c)
if err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
} else {
plog.Debug("No changes found")
}
} else {
plog.Debug("New metric")
c := createMetric(fullMetricName, mInfo)
err := e.registerer.Register(c)
if err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
}
return false
}

func (e *EncodeProm) checkConfUpdate() {
select {
case stage := <-e.updateChan:
cfg := api.PromEncode{}
if stage.Encode != nil && stage.Encode.Prom != nil {
cfg = *stage.Encode.Prom
}
plog.Infof("Received config update: %v", cfg)

e.cleanDeletedMetrics(cfg)

needNewRegistry := false
for i := range cfg.Metrics {
fullMetricName := cfg.Prefix + cfg.Metrics[i].Name
mInfo := CreateMetricInfo(&cfg.Metrics[i])
switch cfg.Metrics[i].Type {
case api.MetricCounter:
if oldMetric, ok := e.metricCommon.counters[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addCounter(fullMetricName, mInfo)
}
} else {
// New metric
e.addCounter(fullMetricName, mInfo)
}
needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.counters, e.addCounter)
case api.MetricGauge:
if oldMetric, ok := e.metricCommon.gauges[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addGauge(fullMetricName, mInfo)
}
} else {
// New metric
e.addGauge(fullMetricName, mInfo)
}
needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.gauges, e.addGauge)
case api.MetricHistogram:
if oldMetric, ok := e.metricCommon.histos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addHistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addHistogram(fullMetricName, mInfo)
}
needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.histos, e.addHistogram)
case api.MetricAggHistogram:
if oldMetric, ok := e.metricCommon.aggHistos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addAgghistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addAgghistogram(fullMetricName, mInfo)
}
needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.aggHistos, e.addAgghistogram)
case "default":
log.Errorf("invalid metric type = %v, skipping", cfg.Metrics[i].Type)
plog.Errorf("invalid metric type = %v, skipping", cfg.Metrics[i].Type)
continue
}

if needNewRegistry {
break
}
}
e.cfg = &cfg
if needNewRegistry {
// cf https://pkg.go.dev/github.com/prometheus/[email protected]/prometheus#Registerer.Unregister
plog.Info("Changes detected on labels: need registry reset.")
e.resetRegistry()
break
}
default:
//Nothing to do
return
}
}

func (e *EncodeProm) resetRegistry() {
e.metricCommon.cleanupInfoStructs()
reg := prometheus.NewRegistry()
e.registerer = reg
for i := range e.cfg.Metrics {
mCfg := &e.cfg.Metrics[i]
fullMetricName := e.cfg.Prefix + mCfg.Name
labels := mCfg.Labels
plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, labels)
mInfo := CreateMetricInfo(mCfg)
var m prometheus.Collector
switch mCfg.Type {
case api.MetricCounter:
m = e.addCounter(fullMetricName, mInfo)
case api.MetricGauge:
m = e.addGauge(fullMetricName, mInfo)
case api.MetricHistogram:
m = e.addHistogram(fullMetricName, mInfo)
case api.MetricAggHistogram:
m = e.addAgghistogram(fullMetricName, mInfo)
case "default":
plog.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
if m != nil {
err := e.registerer.Register(m)
if err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
}
}
e.server.SetRegistry(e.regName, reg)
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -254,73 +294,29 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
if expiryTime.Duration == 0 {
expiryTime.Duration = defaultExpiryTime
}
log.Debugf("expiryTime = %v", expiryTime)
plog.Debugf("expiryTime = %v", expiryTime)

var registerer prometheus.Registerer
registry := prometheus.NewRegistry()

if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
promserver.StartServerAsync(cfg.PromConnectionInfo, nil)
} else {
registerer = prometheus.DefaultRegisterer
}
w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
cfg: &cfg,
registerer: registry,
updateChan: make(chan config.StageParam),
server: promserver.SharedServer,
regName: params.Name,
}

if cfg.PromConnectionInfo != nil {
// Start new server
w.server = promserver.StartServerAsync(cfg.PromConnectionInfo, params.Name, registry)
}

metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
w.metricCommon = metricCommon

for i := range cfg.Metrics {
mCfg := &cfg.Metrics[i]
fullMetricName := cfg.Prefix + mCfg.Name
labels := mCfg.Labels
log.Debugf("fullMetricName = %v", fullMetricName)
log.Debugf("Labels = %v", labels)
mInfo := CreateMetricInfo(mCfg)
switch mCfg.Type {
case api.MetricCounter:
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddCounter(fullMetricName, counter, mInfo)
case api.MetricGauge:
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddGauge(fullMetricName, gauge, mInfo)
case api.MetricHistogram:
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := registerer.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddHist(fullMetricName, hist, mInfo)
case api.MetricAggHistogram:
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := registerer.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddAggHist(fullMetricName, hist, mInfo)
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
// Init metrics
w.resetRegistry()

return w, nil
}

Expand Down
Loading

0 comments on commit b010e85

Please sign in to comment.