diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 05e1a6b4e..b13776042 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -14,7 +14,6 @@ import ( "github.com/pkg/errors" perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" "github.com/grafana/dskit/flagext" @@ -23,26 +22,6 @@ import ( dstime "github.com/grafana/dskit/time" ) -var ( - consulHeartbeats = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "member_consul_heartbeats_total", - Help: "The total number of heartbeats sent to consul.", - }, []string{"name"}) - tokensOwned = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "member_ring_tokens_owned", - Help: "The number of tokens owned in the ring.", - }, []string{"name"}) - tokensToOwn = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "member_ring_tokens_to_own", - Help: "The number of tokens to own in the ring.", - }, []string{"name"}) - shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "shutdown_duration_seconds", - Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).", - Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins. - }, []string{"op", "status", "name"}) -) - // LifecyclerConfig is the config to build a Lifecycler. type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` @@ -145,7 +124,8 @@ type Lifecycler struct { healthyInstancesCount int zonesCount int - logger log.Logger + lifecyclerMetrics *LifecyclerMetrics + logger log.Logger } // NewLifecycler creates new Lifecycler. It must be started via StartAsync. @@ -191,10 +171,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa Zone: zone, actorChan: make(chan func()), state: PENDING, + lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), logger: logger, } - tokensToOwn.WithLabelValues(l.RingName).Set(float64(cfg.NumTokens)) + l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens)) l.BasicService = services. NewBasicService(nil, l.loop, l.stopping). @@ -322,7 +303,7 @@ func (i *Lifecycler) getTokens() Tokens { } func (i *Lifecycler) setTokens(tokens Tokens) { - tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens))) + i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens))) i.stateMtx.Lock() defer i.stateMtx.Unlock() @@ -473,7 +454,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { } case <-heartbeatTickerChan: - consulHeartbeats.WithLabelValues(i.RingName).Inc() + i.lifecyclerMetrics.consulHeartbeats.Inc() if err := i.updateConsul(context.Background()); err != nil { level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) } @@ -520,7 +501,7 @@ heartbeatLoop: for { select { case <-heartbeatTickerChan: - consulHeartbeats.WithLabelValues(i.RingName).Inc() + i.lifecyclerMetrics.consulHeartbeats.Inc() if err := i.updateConsul(context.Background()); err != nil { level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) } @@ -851,17 +832,17 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { level.Info(i.logger).Log("msg", "transfers are disabled") } else { level.Error(i.logger).Log("msg", "failed to transfer chunks to another instance", "ring", i.RingName, "err", err) - shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds()) + i.lifecyclerMetrics.shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds()) } } else { flushRequired = false - shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds()) + i.lifecyclerMetrics.shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds()) } if flushRequired { flushStart := time.Now() i.flushTransferer.Flush() - shutdownDuration.WithLabelValues("flush", "success", i.RingName).Observe(time.Since(flushStart).Seconds()) + i.lifecyclerMetrics.shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds()) } // Sleep so the shutdownDuration metric can be collected. diff --git a/ring/lifecycler_metrics.go b/ring/lifecycler_metrics.go new file mode 100644 index 000000000..422a564c1 --- /dev/null +++ b/ring/lifecycler_metrics.go @@ -0,0 +1,40 @@ +package ring + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type LifecyclerMetrics struct { + consulHeartbeats prometheus.Counter + tokensOwned prometheus.Gauge + tokensToOwn prometheus.Gauge + shutdownDuration *prometheus.HistogramVec +} + +func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics { + return &LifecyclerMetrics{ + consulHeartbeats: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "member_consul_heartbeats_total", + Help: "The total number of heartbeats sent to consul.", + ConstLabels: prometheus.Labels{"name": ringName}, + }), + tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "member_ring_tokens_owned", + Help: "The number of tokens owned in the ring.", + ConstLabels: prometheus.Labels{"name": ringName}, + }), + tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "member_ring_tokens_to_own", + Help: "The number of tokens to own in the ring.", + ConstLabels: prometheus.Labels{"name": ringName}, + }), + shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "shutdown_duration_seconds", + Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).", + Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins. + ConstLabels: prometheus.Labels{"name": ringName}, + }, []string{"op", "status"}), + } + +} diff --git a/ring/ring.go b/ring/ring.go index 4f609517e..01b8d82ca 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/dskit/kv" shardUtil "github.com/grafana/dskit/ring/shard" @@ -47,7 +48,6 @@ const ( // ReadRing represents the read interface to the ring. type ReadRing interface { - prometheus.Collector // Get returns n (or more) instances which form the replicas for the given key. // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value @@ -193,11 +193,12 @@ type Ring struct { // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring - memberOwnershipDesc *prometheus.Desc - numMembersDesc *prometheus.Desc - totalTokensDesc *prometheus.Desc - numTokensDesc *prometheus.Desc - oldestTimestampDesc *prometheus.Desc + memberOwnershipGaugeVec *prometheus.GaugeVec + numMembersGaugeVec *prometheus.GaugeVec + totalTokensGauge prometheus.Gauge + numTokensGaugeVec *prometheus.GaugeVec + oldestTimestampGaugeVec *prometheus.GaugeVec + metricsUpdateTicker *time.Ticker logger log.Logger } @@ -221,10 +222,10 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe return nil, err } - return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), logger) + return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger) } -func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, logger log.Logger) (*Ring, error) { +func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) { if cfg.ReplicationFactor <= 0 { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } @@ -236,40 +237,34 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client strategy: strategy, ringDesc: &Desc{}, shuffledSubringCache: map[subringCacheKey]*Ring{}, - memberOwnershipDesc: prometheus.NewDesc( - "ring_member_ownership_percent", - "The percent ownership of the ring by member", - []string{"member"}, - map[string]string{"name": name}, - ), - numMembersDesc: prometheus.NewDesc( - "ring_members", - "Number of members in the ring", - []string{"state"}, - map[string]string{"name": name}, - ), - totalTokensDesc: prometheus.NewDesc( - "ring_tokens_total", - "Number of tokens in the ring", - nil, - map[string]string{"name": name}, - ), - numTokensDesc: prometheus.NewDesc( - "ring_tokens_owned", - "The number of tokens in the ring owned by the member", - []string{"member"}, - map[string]string{"name": name}, - ), - oldestTimestampDesc: prometheus.NewDesc( - "ring_oldest_member_timestamp", - "Timestamp of the oldest member in the ring.", - []string{"state"}, - map[string]string{"name": name}, - ), + memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "ring_member_ownership_percent", + Help: "The percent ownership of the ring by member", + ConstLabels: map[string]string{"name": name}}, + []string{"member"}), + numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "ring_members", + Help: "Number of members in the ring", + ConstLabels: map[string]string{"name": name}}, + []string{"state"}), + totalTokensGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "ring_tokens_total", + Help: "Number of tokens in the ring", + ConstLabels: map[string]string{"name": name}}), + numTokensGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "ring_tokens_owned", + Help: "The number of tokens in the ring owned by the member", + ConstLabels: map[string]string{"name": name}}, + []string{"member"}), + oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "ring_oldest_member_timestamp", + Help: "Timestamp of the oldest member in the ring.", + ConstLabels: map[string]string{"name": name}}, + []string{"state"}), logger: logger, } - r.Service = services.NewBasicService(r.starting, r.loop, nil).WithName(fmt.Sprintf("%s ring client", name)) + r.Service = services.NewBasicService(r.starting, r.loop, r.stopping).WithName(fmt.Sprintf("%s ring client", name)) return r, nil } @@ -287,6 +282,14 @@ func (r *Ring) starting(ctx context.Context) error { } r.updateRingState(value.(*Desc)) + + // Start metrics update ticker, and give it a function to update the ring metrics. + r.metricsUpdateTicker = time.NewTicker(10 * time.Second) + go func() { + for range r.metricsUpdateTicker.C { + r.updateRingMetrics() + } + }() return nil } @@ -303,6 +306,14 @@ func (r *Ring) loop(ctx context.Context) error { return nil } +func (r *Ring) stopping(_ error) error { + // Stop Metrics ticker. + if r.metricsUpdateTicker != nil { + r.metricsUpdateTicker.Stop() + } + return nil +} + func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.RLock() prevRing := r.ringDesc @@ -523,15 +534,6 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro }, nil } -// Describe implements prometheus.Collector. -func (r *Ring) Describe(ch chan<- *prometheus.Desc) { - ch <- r.memberOwnershipDesc - ch <- r.numMembersDesc - ch <- r.totalTokensDesc - ch <- r.oldestTimestampDesc - ch <- r.numTokensDesc -} - // countTokens returns the number of tokens and tokens within the range for each instance. // The ring read lock must be already taken when calling this function. func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { @@ -563,25 +565,15 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { return numTokens, owned } -// Collect implements prometheus.Collector. -func (r *Ring) Collect(ch chan<- prometheus.Metric) { +// updateRingMetrics updates ring metrics. +func (r *Ring) updateRingMetrics() { r.mtx.RLock() defer r.mtx.RUnlock() numTokens, ownedRange := r.countTokens() for id, totalOwned := range ownedRange { - ch <- prometheus.MustNewConstMetric( - r.memberOwnershipDesc, - prometheus.GaugeValue, - float64(totalOwned)/float64(math.MaxUint32), - id, - ) - ch <- prometheus.MustNewConstMetric( - r.numTokensDesc, - prometheus.GaugeValue, - float64(numTokens[id]), - id, - ) + r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) + r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) } numByState := map[string]int{} @@ -605,27 +597,12 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { } for state, count := range numByState { - ch <- prometheus.MustNewConstMetric( - r.numMembersDesc, - prometheus.GaugeValue, - float64(count), - state, - ) + r.numMembersGaugeVec.WithLabelValues(state).Set(float64(count)) } for state, timestamp := range oldestTimestampByState { - ch <- prometheus.MustNewConstMetric( - r.oldestTimestampDesc, - prometheus.GaugeValue, - float64(timestamp), - state, - ) - } - - ch <- prometheus.MustNewConstMetric( - r.totalTokensDesc, - prometheus.GaugeValue, - float64(len(r.ringTokens)), - ) + r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp)) + } + r.totalTokensGauge.Set(float64(len(r.ringTokens))) } // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)