Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 10 additions & 29 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/grafana/dskit/flagext"
Expand All @@ -23,26 +22,6 @@ import (
dstime "github.com/grafana/dskit/time"
)

var (
consulHeartbeats = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "member_consul_heartbeats_total",
Help: "The total number of heartbeats sent to consul.",
}, []string{"name"})
tokensOwned = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "member_ring_tokens_owned",
Help: "The number of tokens owned in the ring.",
}, []string{"name"})
tokensToOwn = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "member_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
}, []string{"name"})
shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "shutdown_duration_seconds",
Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).",
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
}, []string{"op", "status", "name"})
)

// LifecyclerConfig is the config to build a Lifecycler.
type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`
Expand Down Expand Up @@ -145,7 +124,8 @@ type Lifecycler struct {
healthyInstancesCount int
zonesCount int

logger log.Logger
lifecyclerMetrics *LifecyclerMetrics
logger log.Logger
}

// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
Expand Down Expand Up @@ -191,10 +171,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
Zone: zone,
actorChan: make(chan func()),
state: PENDING,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
}

tokensToOwn.WithLabelValues(l.RingName).Set(float64(cfg.NumTokens))
l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens))

l.BasicService = services.
NewBasicService(nil, l.loop, l.stopping).
Expand Down Expand Up @@ -322,7 +303,7 @@ func (i *Lifecycler) getTokens() Tokens {
}

func (i *Lifecycler) setTokens(tokens Tokens) {
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))
i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens)))

i.stateMtx.Lock()
defer i.stateMtx.Unlock()
Expand Down Expand Up @@ -473,7 +454,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
}

case <-heartbeatTickerChan:
consulHeartbeats.WithLabelValues(i.RingName).Inc()
i.lifecyclerMetrics.consulHeartbeats.Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
Expand Down Expand Up @@ -520,7 +501,7 @@ heartbeatLoop:
for {
select {
case <-heartbeatTickerChan:
consulHeartbeats.WithLabelValues(i.RingName).Inc()
i.lifecyclerMetrics.consulHeartbeats.Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
Expand Down Expand Up @@ -851,17 +832,17 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
level.Info(i.logger).Log("msg", "transfers are disabled")
} else {
level.Error(i.logger).Log("msg", "failed to transfer chunks to another instance", "ring", i.RingName, "err", err)
shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds())
i.lifecyclerMetrics.shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds())
}
} else {
flushRequired = false
shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds())
i.lifecyclerMetrics.shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds())
}

if flushRequired {
flushStart := time.Now()
i.flushTransferer.Flush()
shutdownDuration.WithLabelValues("flush", "success", i.RingName).Observe(time.Since(flushStart).Seconds())
i.lifecyclerMetrics.shutdownDuration.WithLabelValues("flush", "success", i.RingName).Observe(time.Since(flushStart).Seconds())
}

// Sleep so the shutdownDuration metric can be collected.
Expand Down
40 changes: 40 additions & 0 deletions ring/lifecycler_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ring

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type LifecyclerMetrics struct {
consulHeartbeats prometheus.Counter
tokensOwned prometheus.Gauge
tokensToOwn prometheus.Gauge
shutdownDuration *prometheus.HistogramVec
}

func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics {
return &LifecyclerMetrics{
consulHeartbeats: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "member_consul_heartbeats_total",
Help: "The total number of heartbeats sent to consul.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "member_ring_tokens_owned",
Help: "The number of tokens owned in the ring.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "member_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "shutdown_duration_seconds",
Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).",
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
ConstLabels: prometheus.Labels{"name": ringName},
}, []string{"op", "status", "name"}),
}

}
139 changes: 58 additions & 81 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/dskit/kv"
shardUtil "github.com/grafana/dskit/ring/shard"
Expand Down Expand Up @@ -47,7 +48,6 @@ const (

// ReadRing represents the read interface to the ring.
type ReadRing interface {
prometheus.Collector

// Get returns n (or more) instances which form the replicas for the given key.
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
Expand Down Expand Up @@ -193,11 +193,12 @@ type Ring struct {
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring

memberOwnershipDesc *prometheus.Desc
numMembersDesc *prometheus.Desc
totalTokensDesc *prometheus.Desc
numTokensDesc *prometheus.Desc
oldestTimestampDesc *prometheus.Desc
memberOwnershipDesc *prometheus.GaugeVec
numMembersDesc *prometheus.GaugeVec
totalTokensDesc prometheus.Gauge
numTokensDesc *prometheus.GaugeVec
oldestTimestampDesc *prometheus.GaugeVec
metricsUpdateTicker *time.Ticker

logger log.Logger
}
Expand All @@ -221,10 +222,10 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), logger)
return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger)
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, logger log.Logger) (*Ring, error) {
func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) {
if cfg.ReplicationFactor <= 0 {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
Expand All @@ -236,40 +237,34 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
strategy: strategy,
ringDesc: &Desc{},
shuffledSubringCache: map[subringCacheKey]*Ring{},
memberOwnershipDesc: prometheus.NewDesc(
"ring_member_ownership_percent",
"The percent ownership of the ring by member",
[]string{"member"},
map[string]string{"name": name},
),
numMembersDesc: prometheus.NewDesc(
"ring_members",
"Number of members in the ring",
[]string{"state"},
map[string]string{"name": name},
),
totalTokensDesc: prometheus.NewDesc(
"ring_tokens_total",
"Number of tokens in the ring",
nil,
map[string]string{"name": name},
),
numTokensDesc: prometheus.NewDesc(
"ring_tokens_owned",
"The number of tokens in the ring owned by the member",
[]string{"member"},
map[string]string{"name": name},
),
oldestTimestampDesc: prometheus.NewDesc(
"ring_oldest_member_timestamp",
"Timestamp of the oldest member in the ring.",
[]string{"state"},
map[string]string{"name": name},
),
memberOwnershipDesc: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_member_ownership_percent",
Help: "The percent ownership of the ring by member",
ConstLabels: map[string]string{"name": name}},
[]string{"member"}),
numMembersDesc: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_members",
Help: "Number of members in the ring",
ConstLabels: map[string]string{"name": name}},
[]string{"state"}),
totalTokensDesc: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "ring_tokens_total",
Help: "Number of tokens in the ring",
ConstLabels: map[string]string{"name": name}}),
numTokensDesc: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_tokens_owned",
Help: "The number of tokens in the ring owned by the member",
ConstLabels: map[string]string{"name": name}},
[]string{"member"}),
oldestTimestampDesc: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_oldest_member_timestamp",
Help: "Timestamp of the oldest member in the ring.",
ConstLabels: map[string]string{"name": name}},
[]string{"state"}),
logger: logger,
}

r.Service = services.NewBasicService(r.starting, r.loop, nil).WithName(fmt.Sprintf("%s ring client", name))
r.Service = services.NewBasicService(r.starting, r.loop, r.stopping).WithName(fmt.Sprintf("%s ring client", name))
return r, nil
}

Expand All @@ -287,6 +282,14 @@ func (r *Ring) starting(ctx context.Context) error {
}

r.updateRingState(value.(*Desc))

// Start metrics update ticker, and give it a function to update the ring metrics.
r.metricsUpdateTicker = time.NewTicker(10 * time.Second)
go func() {
for range r.metricsUpdateTicker.C {
r.updateRingMetrics()
}
}()
return nil
}

Expand All @@ -303,6 +306,14 @@ func (r *Ring) loop(ctx context.Context) error {
return nil
}

func (r *Ring) stopping(_ error) error {
// Stop Metrics ticker.
if r.metricsUpdateTicker != nil {
r.metricsUpdateTicker.Stop()
}
return nil
}

func (r *Ring) updateRingState(ringDesc *Desc) {
r.mtx.RLock()
prevRing := r.ringDesc
Expand Down Expand Up @@ -523,15 +534,6 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}, nil
}

// Describe implements prometheus.Collector.
func (r *Ring) Describe(ch chan<- *prometheus.Desc) {
ch <- r.memberOwnershipDesc
ch <- r.numMembersDesc
ch <- r.totalTokensDesc
ch <- r.oldestTimestampDesc
ch <- r.numTokensDesc
}

// countTokens returns the number of tokens and tokens within the range for each instance.
// The ring read lock must be already taken when calling this function.
func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
Expand Down Expand Up @@ -563,25 +565,15 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return numTokens, owned
}

// Collect implements prometheus.Collector.
func (r *Ring) Collect(ch chan<- prometheus.Metric) {
// UpdateMetrics updates ring metrics, called by a Ticker in the ring at a set 10 second interval.
func (r *Ring) updateRingMetrics() {
r.mtx.RLock()
defer r.mtx.RUnlock()

numTokens, ownedRange := r.countTokens()
for id, totalOwned := range ownedRange {
ch <- prometheus.MustNewConstMetric(
r.memberOwnershipDesc,
prometheus.GaugeValue,
float64(totalOwned)/float64(math.MaxUint32),
id,
)
ch <- prometheus.MustNewConstMetric(
r.numTokensDesc,
prometheus.GaugeValue,
float64(numTokens[id]),
id,
)
r.memberOwnershipDesc.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numMembersDesc.WithLabelValues(id).Set(float64(numTokens[id]))
}

numByState := map[string]int{}
Expand All @@ -605,27 +597,12 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
}

for state, count := range numByState {
ch <- prometheus.MustNewConstMetric(
r.numMembersDesc,
prometheus.GaugeValue,
float64(count),
state,
)
r.numMembersDesc.WithLabelValues(state).Set(float64(count))
}
for state, timestamp := range oldestTimestampByState {
ch <- prometheus.MustNewConstMetric(
r.oldestTimestampDesc,
prometheus.GaugeValue,
float64(timestamp),
state,
)
}

ch <- prometheus.MustNewConstMetric(
r.totalTokensDesc,
prometheus.GaugeValue,
float64(len(r.ringTokens)),
)
r.oldestTimestampDesc.WithLabelValues(state).Set(float64(timestamp))
}
r.totalTokensDesc.Set(float64(len(r.ringTokens)))
}

// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
Expand Down