Skip to content

Commit

Permalink
Refactor metrics (#590)
Browse files Browse the repository at this point in the history
* refactor metrics code to be common for prometheus and otlp

* handle otlp histograms

* moved otlp files to separate directory

* added metrics_common file

* Update pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go

Co-authored-by: Julien Pinsonneau <[email protected]>

---------

Co-authored-by: Julien Pinsonneau <[email protected]>
  • Loading branch information
KalmanMeth and jpinsonneau committed Feb 26, 2024
1 parent ba7ad1f commit 45bf277
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 434 deletions.
300 changes: 53 additions & 247 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,249 +18,89 @@
package encode

import (
"fmt"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
info *MetricInfo
}

type counterInfo struct {
counter *prometheus.CounterVec
info *MetricInfo
}

type histoInfo struct {
histo *prometheus.HistogramVec
info *MetricInfo
}

type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
gauges []gaugeInfo
counters []counterInfo
histos []histoInfo
aggHistos []histoInfo
expiryTime time.Duration
mCache *putils.TimedCache
mChacheLenMetric prometheus.Gauge
exitChan <-chan struct{}
metricsProcessed prometheus.Counter
metricsDropped prometheus.Counter
errorsCounter *prometheus.CounterVec
cfg *api.PromEncode
registerer prometheus.Registerer
metricCommon *MetricsCommonStruct
}

var (
MetricsProcessed = operational.DefineMetric(
"metrics_processed",
"Number of metrics processed",
operational.TypeCounter,
"stage",
)
MetricsDropped = operational.DefineMetric(
"metrics_dropped",
"Number of metrics dropped",
operational.TypeCounter,
"stage",
)
EncodePromErrors = operational.DefineMetric(
"encode_prom_errors",
"Total errors during metrics generation",
operational.TypeCounter,
"error", "metric", "key",
)
mChacheLen = operational.DefineMetric(
"encode_prom_metrics_reported",
"Total number of prometheus metrics reported by this stage",
operational.TypeGauge,
"stage",
)
)

// Encode encodes a metric before being stored
// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode
func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)

// Process counters
for _, mInfo := range e.counters {
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.counter.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
m.Add(value)
e.metricsProcessed.Inc()
}

// Process gauges
for _, mInfo := range e.gauges {
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.gauge.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
m.Set(value)
e.metricsProcessed.Inc()
}

// Process histograms
for _, mInfo := range e.histos {
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.histo.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
m.Observe(value)
e.metricsProcessed.Inc()
}

// Process pre-aggregated histograms
for _, mInfo := range e.aggHistos {
labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.histo.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
for _, v := range values {
m.Observe(v)
}
e.metricsProcessed.Inc()
}
e.metricCommon.MetricCommonEncode(e, metricRecord)
}

func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, 0
}
floatVal, err := utils.ConvertToFloat64(val)
func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
counter := m.(*prometheus.CounterVec)
mm, err := counter.GetMetricWith(labels)
if err != nil {
e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc()
return nil, 0
}
if info.ValueScale != 0 {
floatVal = floatVal / info.ValueScale
return err
}

entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
e.metricsDropped.Inc()
return nil, 0
}
return entryLabels, floatVal
mm.Add(value)
return nil
}

func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, []float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, nil
}
values, ok := val.([]float64)
if !ok {
e.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc()
return nil, nil
func (e *EncodeProm) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error {
gauge := m.(*prometheus.GaugeVec)
mm, err := gauge.GetMetricWith(labels)
if err != nil {
return err
}
mm.Set(value)
return nil
}

entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
e.metricsDropped.Inc()
return nil, nil
func (e *EncodeProm) ProcessHist(m interface{}, labels map[string]string, value float64) error {
hist := m.(*prometheus.HistogramVec)
mm, err := hist.GetMetricWith(labels)
if err != nil {
return err
}
return entryLabels, values
mm.Observe(value)
return nil
}

func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} {
for _, pred := range info.FilterPredicates {
if !pred(flow) {
return nil
}
}
if info.ValueKey == "" {
// No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1
return 1
func (e *EncodeProm) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error {
hist := m.(*prometheus.HistogramVec)
mm, err := hist.GetMetricWith(labels)
if err != nil {
return err
}
val, found := flow[info.ValueKey]
if !found {
e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc()
return nil
for _, v := range values {
mm.Observe(v)
}
return val
return nil
}

func ExtractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) {
entryLabels := make(map[string]string, len(info.Labels))
key := strings.Builder{}
key.WriteString(info.Name)
key.WriteRune('|')
for _, t := range info.Labels {
entryLabels[t] = ""
if v, ok := flow[t]; ok {
entryLabels[t] = fmt.Sprintf("%v", v)
}
key.WriteString(entryLabels[t])
key.WriteRune('|')
func (e *EncodeProm) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} {
switch mv := m.(type) {
case *prometheus.CounterVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.GaugeVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.HistogramVec:
return func() { mv.Delete(entryLabels) }
}
return entryLabels, key.String()
return nil
}

// callback function from lru cleanup
func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) cleanupExpiredEntriesLoop() {
ticker := time.NewTicker(e.expiryTime)
for {
select {
case <-e.exitChan:
log.Debugf("exiting cleanupExpiredEntriesLoop because of signal")
return
case <-ticker.C:
e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup)
}
}
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -282,11 +122,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
} else {
registerer = prometheus.DefaultRegisterer
}
w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
}

counters := []counterInfo{}
gauges := []gaugeInfo{}
histos := []histoInfo{}
aggHistos := []histoInfo{}
metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
w.metricCommon = metricCommon

for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
Expand All @@ -302,21 +144,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
counters = append(counters, counterInfo{
counter: counter,
info: mInfo,
})
metricCommon.AddCounter(counter, mInfo)
case api.MetricEncodeOperationName("Gauge"):
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
gauges = append(gauges, gaugeInfo{
gauge: gauge,
info: mInfo,
})
metricCommon.AddGauge(gauge, mInfo)
case api.MetricEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -325,10 +161,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
histos = append(histos, histoInfo{
histo: hist,
info: mInfo,
})
metricCommon.AddHist(hist, mInfo)
case api.MetricEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -337,38 +170,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
aggHistos = append(aggHistos, histoInfo{
histo: hist,
info: mInfo,
})
metricCommon.AddAggHist(hist, mInfo)
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}

log.Debugf("counters = %v", counters)
log.Debugf("gauges = %v", gauges)
log.Debugf("histos = %v", histos)
log.Debugf("aggHistos = %v", aggHistos)

mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name)

w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
counters: counters,
gauges: gauges,
histos: histos,
aggHistos: aggHistos,
expiryTime: expiryTime.Duration,
mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric),
mChacheLenMetric: mChacheLenMetric,
exitChan: putils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&MetricsProcessed, params.Name),
metricsDropped: opMetrics.NewCounter(&MetricsDropped, params.Name),
errorsCounter: opMetrics.NewCounterVec(&EncodePromErrors),
}
go w.cleanupExpiredEntriesLoop()
return w, nil
}
Loading

0 comments on commit 45bf277

Please sign in to comment.