Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus stats are incompatible with the prometheus sdk #131

Open
ghost opened this issue Jul 21, 2021 · 2 comments
Open

Prometheus stats are incompatible with the prometheus sdk #131

ghost opened this issue Jul 21, 2021 · 2 comments

Comments

@ghost
Copy link

ghost commented Jul 21, 2021

Hi!

We are encountering a problem with integrating segmentio stats for our kafka consumers into the rest of our prometheus metrics. The issue arises due to the stats lib from segmentio not using the sdk that prometheus provides and instead rolls it's own collector and publisher. This means that the two libraries are fundamentally incompatible when it comes to serving them under the same '/metrics' path.

Is there a way that someone has worked around this? If not, could some form of adapter be added to the lib?

I don't want to be the guy that asks for a large scale rewrite, but it would be nice to see this awesome lib use the sdk provided by prometheus for golang.

@abraithwaite
Copy link
Contributor

abraithwaite commented Jul 22, 2021

Hey @jdeal-mediamath, I think it's unlikely for us to use the prometheus library in this module for various reasons. However, we don't register the metrics URL automatically so you should be able to host the metrics exporter at a separate path from the Prometheus package. It would mean that you would have to scrape 2 endpoints from the same service, but I don't really see a way around that.

What are you looking to integrate with? If it's an internal package, why not use segmentio/stats? If it's a third party library, perhaps you could suggest they add an interface for exporting stats versus using prometheus directly?

@mmorlonDeezer
Copy link

here is the wip i'm currently working on in my consumer app project :

usage :

var defaultKafkaCollector = newKafkaCollector(namespace)

func ObserveKafkaReader(reader *kafka.Reader) {
	defaultKafkaCollector.InstrumentReader(reader)
}

"glue code"

package metrics

import (
	"sync"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/segmentio/kafka-go"
)

var labelNames = []string{"topic", "partition"}

type counterAdder struct {
	counter *prometheus.CounterVec
	getter  func(*kafka.ReaderStats) float64
}

func (ca *counterAdder) Add(r *kafka.ReaderStats) {
	ca.counter.WithLabelValues(r.Topic, r.Partition).Add(ca.getter(r))
}

type gaugeSetter struct {
	gauge  *prometheus.GaugeVec
	getter func(*kafka.ReaderStats) float64
}

func (gs *gaugeSetter) Set(r *kafka.ReaderStats) {
	gs.gauge.WithLabelValues(r.Topic, r.Partition).Set(gs.getter(r))
}

type kafkaCollector struct {
	readers     []*kafka.Reader
	readersLock sync.RWMutex

	counters []*counterAdder
	gauges   []*gaugeSetter
}

func newKafkaCollector(namespace string) *kafkaCollector {
	collector := new(kafkaCollector)

	cFac := func(name string, f func(*kafka.ReaderStats) float64) {
		collector.counters = append(collector.counters, &counterAdder{
			counter: prometheus.NewCounterVec(prometheus.CounterOpts{Namespace: namespace, Subsystem: "kafka", Name: name}, labelNames),
			getter:  f,
		})
	}

	gFac := func(name string, f func(*kafka.ReaderStats) float64) {
		collector.gauges = append(collector.gauges, &gaugeSetter{
			gauge:  prometheus.NewGaugeVec(prometheus.GaugeOpts{Namespace: namespace, Subsystem: "kafka", Name: name}, labelNames),
			getter: f,
		})
	}

	cFac("dials_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Dials) })
	cFac("fetches_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Fetches) })
	cFac("messages_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Messages) })
	cFac("bytes_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Bytes) })
	cFac("rebalances_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Rebalances) })
	cFac("timeouts_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Timeouts) })
	cFac("errors_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Errors) })

	gFac("offset", func(rs *kafka.ReaderStats) float64 { return float64(rs.Offset) })
	gFac("lag", func(rs *kafka.ReaderStats) float64 { return float64(rs.Lag) })
	gFac("queue_length", func(rs *kafka.ReaderStats) float64 { return float64(rs.QueueLength) })
	gFac("queue_capacity", func(rs *kafka.ReaderStats) float64 { return float64(rs.QueueCapacity) })

	prometheus.MustRegister(collector)

	return collector
}

func (c *kafkaCollector) Collect(m chan<- prometheus.Metric) {

	c.readersLock.RLock()
	defer c.readersLock.RUnlock()

	for _, reader := range c.readers {
		stats := reader.Stats()
		for _, counter := range c.counters {
			counter.Add(&stats)
		}
		for _, gauge := range c.gauges {
			gauge.Set(&stats)
		}
	}

	for _, counter := range c.counters {
		counter.counter.Collect(m)
	}
	for _, gauge := range c.gauges {
		gauge.gauge.Collect(m)
	}
}

func (c *kafkaCollector) Describe(d chan<- *prometheus.Desc) {
	for _, counter := range c.counters {
		counter.counter.Describe(d)
	}
	for _, gauge := range c.gauges {
		gauge.gauge.Describe(d)
	}
}

func (c *kafkaCollector) InstrumentReader(r *kafka.Reader) {
	c.readersLock.Lock()
	defer c.readersLock.Unlock()
	c.readers = append(c.readers, r)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants