diff --git a/CHANGELOG.md b/CHANGELOG.md index cc91c4c20c2..02c7a07af1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [CHANGE] Metric `cortex_kv_request_duration_seconds` now includes `name` label to denote which client is being used as well as the `backend` label to denote the KV backend implementation in use. #2648 * [CHANGE] Experimental Ruler: Rule groups persisted to object storage using the experimental API have an updated object key encoding to better handle special characters. Rule groups previously-stored using object storage must be renamed to the new format. #2646 * [CHANGE] Query Frontend now uses Round Robin to choose a tenant queue to service next. #2553 * [CHANGE] `-promql.lookback-delta` is now deprecated and has been replaced by `-querier.lookback-delta` along with `lookback_delta` entry under `querier` in the config file. `-promql.lookback-delta` will be removed in v1.4.0. #2604 diff --git a/integration/kv_test.go b/integration/kv_test.go index 5114908d8f2..c74185de2dc 100644 --- a/integration/kv_test.go +++ b/integration/kv_test.go @@ -4,10 +4,13 @@ package main import ( "context" + "errors" "sort" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/integration/e2e" @@ -28,6 +31,8 @@ func TestKV_List_Delete(t *testing.T) { require.NoError(t, s.StartAndWaitReady(etcdSvc, consulSvc)) + reg := prometheus.NewRegistry() + etcdKv, err := kv.NewClient(kv.Config{ Store: "etcd", Prefix: "keys/", @@ -38,7 +43,7 @@ func TestKV_List_Delete(t *testing.T) { MaxRetries: 5, }, }, - }, stringCodec{}) + }, stringCodec{}, reg) require.NoError(t, err) consulKv, err := kv.NewClient(kv.Config{ @@ -52,7 +57,7 @@ func TestKV_List_Delete(t *testing.T) { WatchKeyRateLimit: 1, }, }, - }, stringCodec{}) + }, stringCodec{}, reg) require.NoError(t, err) kvs := []struct { @@ -98,6 +103,34 @@ func TestKV_List_Delete(t *testing.T) { require.Nil(t, v, "object was not deleted") }) } + + // Ensure the proper histogram metrics are reported + metrics, err := reg.Gather() + require.NoError(t, err) + + require.Len(t, metrics, 1) + require.Equal(t, "cortex_kv_request_duration_seconds", metrics[0].GetName()) + require.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType()) + require.Len(t, metrics[0].GetMetric(), 8) + + getMetricOperation := func(labels []*dto.LabelPair) (string, error) { + for _, l := range labels { + if l.GetName() == "operation" { + return l.GetValue(), nil + } + } + return "", errors.New("no operation") + } + + for _, metric := range metrics[0].GetMetric() { + op, err := getMetricOperation(metric.Label) + require.NoErrorf(t, err, "No operation label found in metric %v", metric.String()) + if op == "CAS" { + require.Equal(t, uint64(4), metric.GetHistogram().GetSampleCount()) + } else { + require.Equal(t, uint64(1), metric.GetHistogram().GetSampleCount()) + } + } } type stringCodec struct{} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 902120532eb..ea682547d29 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -195,12 +195,12 @@ func (c *Compactor) starting(ctx context.Context) error { // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() - c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false) + c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false, c.registerer) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring lifecycler") } - c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey) + c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey, c.registerer) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring") } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index ab50a693257..a82de089990 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -107,7 +107,7 @@ func (t *Cortex) initServer() (services.Service, error) { func (t *Cortex) initRing() (serv services.Service, err error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) + t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (t *Cortex) initDistributor() (serv services.Service, err error) { // ruler's dependency) canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer) if err != nil { return } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 28e66213c5a..165887c7fbd 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -174,7 +174,7 @@ func (cfg *Config) Validate() error { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer) (*Distributor, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = func(addr string) (ring_client.PoolClient, error) { return ingester_client.MakeIngesterClient(addr, clientConfig) @@ -184,7 +184,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout - replicas, err := newClusterTracker(cfg.HATrackerConfig) + replicas, err := newClusterTracker(cfg.HATrackerConfig, reg) if err != nil { return nil, err } @@ -201,7 +201,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { - distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true) + distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true, reg) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 972eb0d59c2..8aa2ed240cf 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -369,7 +369,7 @@ func TestDistributor_PushHAInstances(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: 100 * time.Millisecond, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) d.HATracker = r @@ -936,7 +936,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin }, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: 3, - }, ring.IngesterRingKey, ring.IngesterRingKey) + }, ring.IngesterRingKey, ring.IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) @@ -969,7 +969,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin overrides, err := validation.NewOverrides(*cfg.limits, nil) require.NoError(t, err) - d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true) + d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index fce5e72ed94..5bfbd8a1eb8 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -143,7 +143,7 @@ func GetReplicaDescCodec() codec.Proto { // NewClusterTracker returns a new HA cluster tracker using either Consul // or in-memory KV store. Tracker must be started via StartAsync(). -func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) { +func newClusterTracker(cfg HATrackerConfig, reg prometheus.Registerer) (*haTracker, error) { var jitter time.Duration if cfg.UpdateTimeoutJitterMax > 0 { jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax @@ -157,7 +157,11 @@ func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) { } if cfg.EnableHATracker { - client, err := kv.NewClient(cfg.KVStore, GetReplicaDescCodec()) + client, err := kv.NewClient( + cfg.KVStore, + GetReplicaDescCodec(), + kv.RegistererWithKVName(reg, "distributor-hatracker"), + ) if err != nil { return nil, err } diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 4767650a56a..c4a3c78bd46 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -135,7 +135,7 @@ func TestWatchPrefixAssignment(t *testing.T) { UpdateTimeout: time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Millisecond * 2, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -165,7 +165,7 @@ func TestCheckReplicaOverwriteTimeout(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -200,7 +200,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -235,7 +235,7 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -286,7 +286,7 @@ func TestCheckReplicaUpdateTimeout(t *testing.T) { UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -347,7 +347,7 @@ func TestCheckReplicaMultiUser(t *testing.T) { UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -430,7 +430,7 @@ func TestCheckReplicaUpdateTimeoutJitter(t *testing.T) { UpdateTimeout: testData.updateTimeout, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, - }) + }, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5bc1a1c35e1..a80126a52bb 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -199,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // During WAL recovery, it will create new user states which requires the limiter. // Hence initialise the limiter before creating the WAL. // The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled. - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled, registerer) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 32ce43c7a6f..25b7e1faf7d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -146,7 +146,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, }, i.numSeriesInTSDB) } - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer) if err != nil { return nil, err } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 04cc8ade4fe..c052d8134a0 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -150,7 +150,11 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa if gatewayCfg.ShardingEnabled { storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig() - storesRingBackend, err := kv.NewClient(storesRingCfg.KVStore, ring.GetCodec()) + storesRingBackend, err := kv.NewClient( + storesRingCfg.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(reg, "querier-store-gateway"), + ) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring backend") } diff --git a/pkg/ring/kv/client.go b/pkg/ring/kv/client.go index 9b5c9188e03..2c334e7d997 100644 --- a/pkg/ring/kv/client.go +++ b/pkg/ring/kv/client.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ring/kv/etcd" @@ -97,19 +99,19 @@ type Client interface { // NewClient creates a new Client (consul, etcd or inmemory) based on the config, // encodes and decodes data for storage using the codec. -func NewClient(cfg Config, codec codec.Codec) (Client, error) { +func NewClient(cfg Config, codec codec.Codec, reg prometheus.Registerer) (Client, error) { if cfg.Mock != nil { return cfg.Mock, nil } - return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec) + return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec, reg) } -func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec) (Client, error) { +func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { var client Client var err error - switch name { + switch backend { case "consul": client, err = consul.NewClient(cfg.Consul, codec) @@ -135,10 +137,10 @@ func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec } case "multi": - client, err = buildMultiClient(cfg, codec) + client, err = buildMultiClient(cfg, codec, reg) default: - return nil, fmt.Errorf("invalid KV store type: %s", name) + return nil, fmt.Errorf("invalid KV store type: %s", backend) } if err != nil { @@ -149,10 +151,20 @@ func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec client = PrefixClient(client, prefix) } - return metrics{client}, nil + // If no Registerer is provided return the raw client + if reg == nil { + return client, nil + } + + return newMetricsClient(backend, client, reg), nil } -func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) { +func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registerer) (Client, error) { + var ( + primaryLabel = prometheus.Labels{"role": "primary"} + secondaryLabel = prometheus.Labels{"role": "secondary"} + ) + if cfg.Multi.Primary == "" || cfg.Multi.Secondary == "" { return nil, fmt.Errorf("primary or secondary store not set") } @@ -163,12 +175,12 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) { return nil, fmt.Errorf("primary and secondary stores must be different") } - primary, err := createClient(cfg.Multi.Primary, "", cfg, codec) + primary, err := createClient(cfg.Multi.Primary, "", cfg, codec, prometheus.WrapRegistererWith(primaryLabel, reg)) if err != nil { return nil, err } - secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec) + secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec, prometheus.WrapRegistererWith(secondaryLabel, reg)) if err != nil { return nil, err } diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 1e905c93551..6fa84503648 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -5,19 +5,19 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/instrument" ) -var requestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "kv_request_duration_seconds", - Help: "Time spent on consul requests.", - Buckets: prometheus.DefBuckets, -}, []string{"operation", "status_code"})) +// RegistererWithKVName wraps the provided Registerer with the KV name label. If a nil reg +// is provided, a nil registry is returned +func RegistererWithKVName(reg prometheus.Registerer, name string) prometheus.Registerer { + if reg == nil { + return nil + } -func init() { - requestDuration.Register() + return prometheus.WrapRegistererWith(prometheus.Labels{"kv_name": name}, reg) } // errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument @@ -32,12 +32,30 @@ func errorCode(err error) string { } type metrics struct { - c Client + c Client + requestDuration *instrument.HistogramCollector +} + +func newMetricsClient(backend string, c Client, reg prometheus.Registerer) Client { + return &metrics{ + c: c, + requestDuration: instrument.NewHistogramCollector( + promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "kv_request_duration_seconds", + Help: "Time spent on kv store requests.", + Buckets: prometheus.DefBuckets, + ConstLabels: prometheus.Labels{ + "type": backend, + }, + }, []string{"operation", "status_code"}), + ), + } } func (m metrics) List(ctx context.Context, prefix string) ([]string, error) { var result []string - err := instrument.CollectedRequest(ctx, "List", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "List", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { var err error result, err = m.c.List(ctx, prefix) return err @@ -47,7 +65,7 @@ func (m metrics) List(ctx context.Context, prefix string) ([]string, error) { func (m metrics) Get(ctx context.Context, key string) (interface{}, error) { var result interface{} - err := instrument.CollectedRequest(ctx, "GET", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "GET", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { var err error result, err = m.c.Get(ctx, key) return err @@ -56,27 +74,27 @@ func (m metrics) Get(ctx context.Context, key string) (interface{}, error) { } func (m metrics) Delete(ctx context.Context, key string) error { - err := instrument.CollectedRequest(ctx, "Delete", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "Delete", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { return m.c.Delete(ctx, key) }) return err } func (m metrics) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return instrument.CollectedRequest(ctx, "CAS", requestDuration, errorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "CAS", m.requestDuration, errorCode, func(ctx context.Context) error { return m.c.CAS(ctx, key, f) }) } func (m metrics) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { - _ = instrument.CollectedRequest(ctx, "WatchKey", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _ = instrument.CollectedRequest(ctx, "WatchKey", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { m.c.WatchKey(ctx, key, f) return nil }) } func (m metrics) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { - _ = instrument.CollectedRequest(ctx, "WatchPrefix", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _ = instrument.CollectedRequest(ctx, "WatchPrefix", m.requestDuration, instrument.ErrorCode, func(ctx context.Context) error { m.c.WatchPrefix(ctx, prefix, f) return nil }) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index e68aed9ed14..6e9e4e4b347 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -140,14 +140,19 @@ type Lifecycler struct { } // NewLifecycler creates new Lifecycler. It must be started via StartAsync. -func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool) (*Lifecycler, error) { +func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, reg prometheus.Registerer) (*Lifecycler, error) { addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames) if err != nil { return nil, err } port := GetInstancePort(cfg.Port, cfg.ListenPort) codec := GetCodec() - store, err := kv.NewClient(cfg.RingConfig.KVStore, codec) + // Suffix all client names with "-lifecycler" to denote this kv client is used by the lifecycler + store, err := kv.NewClient( + cfg.RingConfig.KVStore, + codec, + kv.RegistererWithKVName(reg, ringName+"-lifecycler"), + ) if err != nil { return nil, err } diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index ed9187c9149..179340d3b69 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -58,7 +58,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -68,7 +68,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig1.JoinAfter = 100 * time.Millisecond - lifecycler1, err := NewLifecycler(lifecyclerConfig1, &flushTransferer{}, "ingester", IngesterRingKey, true) + lifecycler1, err := NewLifecycler(lifecyclerConfig1, &flushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler1.HealthyInstancesCount()) @@ -84,7 +84,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig2.JoinAfter = 100 * time.Millisecond - lifecycler2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true) + lifecycler2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler2.HealthyInstancesCount()) @@ -108,7 +108,7 @@ func TestLifecycler_NilFlushTransferer(t *testing.T) { lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Create a lifecycler with nil FlushTransferer to make sure it operates correctly - lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true) + lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) @@ -132,12 +132,12 @@ func TestLifecycler_TwoRingsWithDifferentKeysOnTheSameKVStore(t *testing.T) { lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "instance-1") lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "instance-2") - lifecycler1, err := NewLifecycler(lifecyclerConfig1, nil, "service-1", "ring-1", true) + lifecycler1, err := NewLifecycler(lifecyclerConfig1, nil, "service-1", "ring-1", true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler1)) defer services.StopAndAwaitTerminated(context.Background(), lifecycler1) //nolint:errcheck - lifecycler2, err := NewLifecycler(lifecyclerConfig2, nil, "service-2", "ring-2", true) + lifecycler2, err := NewLifecycler(lifecyclerConfig2, nil, "service-2", "ring-2", true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler2)) defer services.StopAndAwaitTerminated(context.Background(), lifecycler2) //nolint:errcheck @@ -166,14 +166,14 @@ func TestRingRestart(t *testing.T) { c := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(c) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck // Add an 'ingester' with normalised tokens. lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") - l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -187,7 +187,7 @@ func TestRingRestart(t *testing.T) { token := l1.tokens[0] // Add a second ingester with the same settings, so it will think it has restarted - l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) @@ -261,7 +261,7 @@ func TestCheckReady(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = &MockClient{} - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, r.StartAsync(context.Background())) // This is very atypical, but if we used AwaitRunning, that would fail, because of how quickly service terminates ... @@ -271,7 +271,7 @@ func TestCheckReady(t *testing.T) { cfg := testLifecyclerConfig(ringConfig, "ring1") cfg.MinReadyDuration = 1 * time.Nanosecond - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -293,7 +293,7 @@ func TestTokensOnDisk(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -309,7 +309,7 @@ func TestTokensOnDisk(t *testing.T) { lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" // Start first ingester. - l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -333,7 +333,7 @@ func TestTokensOnDisk(t *testing.T) { // Start new ingester at same token directory. lifecyclerConfig.ID = "ing2" - l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true) + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck @@ -368,7 +368,7 @@ func TestJoinInLeavingState(t *testing.T) { c := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(c) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -395,7 +395,7 @@ func TestJoinInLeavingState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -420,7 +420,7 @@ func TestJoinInJoiningState(t *testing.T) { c := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(c) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -447,7 +447,7 @@ func TestJoinInJoiningState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) @@ -476,7 +476,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { codec := GetCodec() ringConfig.KVStore.Mock = consul.NewInMemoryClient(codec) - r, err := New(ringConfig, "ingester", IngesterRingKey) + r, err := New(ringConfig, "ingester", IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -502,7 +502,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 10a113b6a2e..0b52c4a09a0 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -116,9 +116,14 @@ type Ring struct { } // New creates a new Ring. Being a service, Ring needs to be started to do anything. -func New(cfg Config, name, key string) (*Ring, error) { +func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error) { codec := GetCodec() - store, err := kv.NewClient(cfg.KVStore, codec) + // Suffix all client names with "-ring" to denote this kv client is used by the ring + store, err := kv.NewClient( + cfg.KVStore, + codec, + kv.RegistererWithKVName(reg, name+"-ring"), + ) if err != nil { return nil, err } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a5192a4738a..faec4731a2d 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -190,7 +190,11 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable } if cfg.EnableSharding { - ringStore, err := kv.NewClient(cfg.Ring.KVStore, ring.GetCodec()) + ringStore, err := kv.NewClient( + cfg.Ring.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(reg, "ruler"), + ) if err != nil { return nil, errors.Wrap(err, "create KV store client") } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 02ec9be63d0..49609b0af81 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -81,7 +81,11 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.Config, logLevel } if gatewayCfg.ShardingEnabled { - ringStore, err = kv.NewClient(gatewayCfg.ShardingRing.KVStore, ring.GetCodec()) + ringStore, err = kv.NewClient( + gatewayCfg.ShardingRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(reg, "store-gateway"), + ) if err != nil { return nil, errors.Wrap(err, "create KV store client") }