diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c1455eea15..507adf7c5bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [CHANGE] Experimental TSDB: the store-gateway service is required in a Cortex cluster running with the experimental blocks storage. Removed the `-experimental.tsdb.store-gateway-enabled` CLI flag and `store_gateway_enabled` YAML config option. The store-gateway is now always enabled when the storage engine is `tsdb`. #2822 * [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778 * [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826 +* [CHANGE] Add `component` label to metrics exposed by chunk, delete and index store clients. #2774 * [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837 * [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783 * [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783 @@ -59,6 +60,7 @@ * [BUGFIX] Ingester: Flushing chunks via `/flush` endpoint could previously lead to panic, if chunks were already flushed before and then removed from memory during the flush caused by `/flush` handler. Immediate flush now doesn't cause chunks to be flushed again. Samples received during flush triggered via `/flush` handler are no longer discarded. #2778 * [BUGFIX] Prometheus upgraded. #2849 * Fixed unknown symbol error during head compaction +* [BUGFIX] Fix panic when using cassandra as store for both index and delete requests. #2774 * [BUGFIX] Experimental Delete Series: Fixed a data race in Purger. #2817 * [BUGFIX] KV: Fixed a bug that triggered a panic due to metrics being registered with the same name but different labels when using a `multi` configured KV client. #2837 diff --git a/pkg/chunk/aws/dynamodb_metrics.go b/pkg/chunk/aws/dynamodb_metrics.go new file mode 100644 index 00000000000..58533b44146 --- /dev/null +++ b/pkg/chunk/aws/dynamodb_metrics.go @@ -0,0 +1,60 @@ +package aws + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" +) + +type dynamoDBMetrics struct { + dynamoRequestDuration *instrument.HistogramCollector + dynamoConsumedCapacity *prometheus.CounterVec + dynamoThrottled *prometheus.CounterVec + dynamoFailures *prometheus.CounterVec + dynamoDroppedRequests *prometheus.CounterVec + dynamoQueryPagesCount prometheus.Histogram +} + +func newMetrics(r prometheus.Registerer) *dynamoDBMetrics { + m := dynamoDBMetrics{} + + m.dynamoRequestDuration = instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "dynamo_request_duration_seconds", + Help: "Time spent doing DynamoDB requests.", + + // DynamoDB latency seems to range from a few ms to a several seconds and is + // important. So use 9 buckets from 1ms to just over 1 minute (65s). + Buckets: prometheus.ExponentialBuckets(0.001, 4, 9), + }, []string{"operation", "status_code"})) + m.dynamoConsumedCapacity = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_consumed_capacity_total", + Help: "The capacity units consumed by operation.", + }, []string{"operation", tableNameLabel}) + m.dynamoThrottled = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_throttled_total", + Help: "The total number of throttled events.", + }, []string{"operation", tableNameLabel}) + m.dynamoFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_failures_total", + Help: "The total number of errors while storing chunks to the chunk store.", + }, []string{tableNameLabel, errorReasonLabel, "operation"}) + m.dynamoDroppedRequests = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_dropped_requests_total", + Help: "The total number of requests which were dropped due to errors encountered from dynamo.", + }, []string{tableNameLabel, errorReasonLabel, "operation"}) + m.dynamoQueryPagesCount = promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "dynamo_query_pages_count", + Help: "Number of pages per query.", + // Most queries will have one page, however this may increase with fuzzy + // metric names. + Buckets: prometheus.ExponentialBuckets(1, 4, 6), + }) + + return &m +} diff --git a/pkg/chunk/aws/dynamodb_storage_client.go b/pkg/chunk/aws/dynamodb_storage_client.go index 126de4e209d..b1e25ec8c7e 100644 --- a/pkg/chunk/aws/dynamodb_storage_client.go +++ b/pkg/chunk/aws/dynamodb_storage_client.go @@ -50,55 +50,6 @@ const ( validationException = "ValidationException" ) -var ( - dynamoRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "dynamo_request_duration_seconds", - Help: "Time spent doing DynamoDB requests.", - - // DynamoDB latency seems to range from a few ms to a several seconds and is - // important. So use 9 buckets from 1ms to just over 1 minute (65s). - Buckets: prometheus.ExponentialBuckets(0.001, 4, 9), - }, []string{"operation", "status_code"})) - dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_consumed_capacity_total", - Help: "The capacity units consumed by operation.", - }, []string{"operation", tableNameLabel}) - dynamoThrottled = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_throttled_total", - Help: "The total number of throttled events.", - }, []string{"operation", tableNameLabel}) - dynamoFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_failures_total", - Help: "The total number of errors while storing chunks to the chunk store.", - }, []string{tableNameLabel, errorReasonLabel, "operation"}) - dynamoDroppedRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_dropped_requests_total", - Help: "The total number of requests which were dropped due to errors encountered from dynamo.", - }, []string{tableNameLabel, errorReasonLabel, "operation"}) - dynamoQueryPagesCount = prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "dynamo_query_pages_count", - Help: "Number of pages per query.", - // Most queries will have one page, however this may increase with fuzzy - // metric names. - Buckets: prometheus.ExponentialBuckets(1, 4, 6), - }) -) - -func init() { - dynamoRequestDuration.Register() - prometheus.MustRegister(dynamoConsumedCapacity) - prometheus.MustRegister(dynamoThrottled) - prometheus.MustRegister(dynamoFailures) - prometheus.MustRegister(dynamoQueryPagesCount) - prometheus.MustRegister(dynamoDroppedRequests) -} - // DynamoDBConfig specifies config for a DynamoDB database. type DynamoDBConfig struct { DynamoDB flagext.URLValue `yaml:"dynamodb_url"` @@ -148,20 +99,22 @@ type dynamoDBStorageClient struct { // of boilerplate. batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest + + metrics *dynamoDBMetrics } // NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient. -func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - return newDynamoDBStorageClient(cfg, schemaCfg) +func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) { + return newDynamoDBStorageClient(cfg, schemaCfg, reg) } // NewDynamoDBChunkClient makes a new DynamoDB-backed chunk.Client. -func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.Client, error) { - return newDynamoDBStorageClient(cfg, schemaCfg) +func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.Client, error) { + return newDynamoDBStorageClient(cfg, schemaCfg, reg) } // newDynamoDBStorageClient makes a new DynamoDB-backed IndexClient and chunk.Client. -func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (*dynamoDBStorageClient, error) { +func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (*dynamoDBStorageClient, error) { dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) if err != nil { return nil, err @@ -172,6 +125,7 @@ func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) schemaCfg: schemaCfg, DynamoDB: dynamoDB, writeThrottle: rate.NewLimiter(rate.Limit(cfg.ThrottleLimit), dynamoDBMaxWriteBatchSize), + metrics: newMetrics(reg), } client.batchGetItemRequestFn = client.batchGetItemRequest client.batchWriteItemRequestFn = client.batchWriteItemRequest @@ -187,9 +141,9 @@ func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch { return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) } -func logWriteRetry(ctx context.Context, unprocessed dynamoDBWriteBatch) { +func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) { for table, reqs := range unprocessed { - dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs))) + metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs))) for _, req := range reqs { item := req.PutRequest.Item var hash, rnge string @@ -225,25 +179,25 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), }) - err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return request.Send() }) resp := request.Data().(*dynamodb.BatchWriteItemOutput) for _, cc := range resp.ConsumedCapacity { - dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName). + a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName). Add(float64(*cc.CapacityUnits)) } if err != nil { for tableName := range requests { - recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem") + recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics) } // If we get provisionedThroughputExceededException, then no items were processed, // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { - logWriteRetry(ctx, requests) + logWriteRetry(requests, a.metrics) unprocessed.TakeReqs(requests, -1) _ = a.writeThrottle.WaitN(ctx, len(requests)) backoff.Wait() @@ -256,7 +210,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context // to determine if a request was dropped (or not) for tableName := range requests { - dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc() + a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc() } continue } @@ -268,7 +222,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write // If there are unprocessed items, retry those items. unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems) if len(unprocessedItems) > 0 { - logWriteRetry(ctx, unprocessedItems) + logWriteRetry(unprocessedItems, a.metrics) _ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) unprocessed.TakeReqs(unprocessedItems, -1) } @@ -329,11 +283,11 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery pageCount := 0 defer func() { - dynamoQueryPagesCount.Observe(float64(pageCount)) + a.metrics.dynamoQueryPagesCount.Observe(float64(pageCount)) }() retryer := newRetryer(ctx, a.cfg.backoffConfig) - err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error { + err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error { if sp := ot.SpanFromContext(innerCtx); sp != nil { sp.SetTag("tableName", query.TableName) sp.SetTag("hashValue", query.HashValue) @@ -345,12 +299,12 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery } if cc := output.ConsumedCapacity; cc != nil { - dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName). + a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName). Add(float64(*cc.CapacityUnits)) } return callback(query, &dynamoDBReadResponse{items: output.Items}) - }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages")) + }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics)) }) if err != nil { return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName) @@ -481,19 +435,19 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), }) - err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return request.Send() }) response := request.Data().(*dynamodb.BatchGetItemOutput) for _, cc := range response.ConsumedCapacity { - dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName). + a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName). Add(float64(*cc.CapacityUnits)) } if err != nil { for tableName := range requests { - recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages") + recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics) } // If we get provisionedThroughputExceededException, then no items were processed, @@ -509,7 +463,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context // to determine if a request was dropped (or not) for tableName := range requests { - dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc() + a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc() } continue } @@ -792,21 +746,21 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) { } } -func withErrorHandler(tableName, operation string) func(req *request.Request) { +func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) { return func(req *request.Request) { req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) { if req.Error != nil { - recordDynamoError(tableName, req.Error, operation) + recordDynamoError(tableName, req.Error, operation, metrics) } }) } } -func recordDynamoError(tableName string, err error, operation string) { +func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) { if awsErr, ok := err.(awserr.Error); ok { - dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1)) + metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1)) } else { - dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1)) + metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1)) } } diff --git a/pkg/chunk/aws/dynamodb_table_client.go b/pkg/chunk/aws/dynamodb_table_client.go index 8d8571f7adf..a2ab3b8096b 100644 --- a/pkg/chunk/aws/dynamodb_table_client.go +++ b/pkg/chunk/aws/dynamodb_table_client.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" @@ -35,10 +36,11 @@ type dynamoTableClient struct { DynamoDB dynamodbiface.DynamoDBAPI callManager callManager autoscale autoscale + metrics *dynamoDBMetrics } // NewDynamoDBTableClient makes a new DynamoTableClient. -func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { +func NewDynamoDBTableClient(cfg DynamoDBConfig, reg prometheus.Registerer) (chunk.TableClient, error) { dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) if err != nil { return nil, err @@ -51,7 +53,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { var autoscale autoscale if cfg.Metrics.URL != "" { - autoscale, err = newMetrics(cfg) + autoscale, err = newMetricsAutoScaling(cfg) if err != nil { return nil, err } @@ -61,6 +63,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { DynamoDB: dynamoDB, callManager: callManager, autoscale: autoscale, + metrics: newMetrics(reg), }, nil } @@ -95,7 +98,7 @@ func (d callManager) backoffAndRetry(ctx context.Context, fn func(context.Contex func (d dynamoTableClient) ListTables(ctx context.Context) ([]string, error) { table := []string{} err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.ListTablesPages", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.ListTablesPages", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return d.DynamoDB.ListTablesPagesWithContext(ctx, &dynamodb.ListTablesInput{}, func(resp *dynamodb.ListTablesOutput, _ bool) bool { for _, s := range resp.TableNames { table = append(table, *s) @@ -121,7 +124,7 @@ func chunkTagsToDynamoDB(ts chunk.Tags) []*dynamodb.Tag { func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { var tableARN *string if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.CreateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.CreateTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { input := &dynamodb.CreateTableInput{ TableName: aws.String(desc.Name), AttributeDefinitions: []*dynamodb.AttributeDefinition{ @@ -179,7 +182,7 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc tags := chunkTagsToDynamoDB(desc.Tags) if len(tags) > 0 { return d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := d.DynamoDB.TagResourceWithContext(ctx, &dynamodb.TagResourceInput{ ResourceArn: tableARN, Tags: tags, @@ -196,7 +199,7 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc func (d dynamoTableClient) DeleteTable(ctx context.Context, name string) error { if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.DeleteTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.DeleteTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { input := &dynamodb.DeleteTableInput{TableName: aws.String(name)} _, err := d.DynamoDB.DeleteTableWithContext(ctx, input) if err != nil { @@ -215,7 +218,7 @@ func (d dynamoTableClient) DeleteTable(ctx context.Context, name string) error { func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) { var tableARN *string err = d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { out, err := d.DynamoDB.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(name), }) @@ -248,7 +251,7 @@ func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc } err = d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.ListTagsOfResource", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.ListTagsOfResource", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { out, err := d.DynamoDB.ListTagsOfResourceWithContext(ctx, &dynamodb.ListTagsOfResourceInput{ ResourceArn: tableARN, }) @@ -300,7 +303,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch !expected.UseOnDemandIOMode { level.Info(util.Logger).Log("msg", "updating provisioned throughput on table", "table", expected.Name, "old_read", current.ProvisionedRead, "old_write", current.ProvisionedWrite, "new_read", expected.ProvisionedRead, "new_write", expected.ProvisionedWrite) if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { var dynamoBillingMode string updateTableInput := &dynamodb.UpdateTableInput{TableName: aws.String(expected.Name), ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ @@ -320,7 +323,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch return err }) }); err != nil { - recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable") + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics) if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) } else { @@ -331,14 +334,14 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch // moved the enabling of OnDemand mode to it's own block to reduce complexities & interactions with the various // settings used in provisioned mode. Unfortunately the boilerplate wrappers for retry and tracking needed to be copied. if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { level.Info(util.Logger).Log("msg", "updating billing mode on table", "table", expected.Name, "old_mode", current.UseOnDemandIOMode, "new_mode", expected.UseOnDemandIOMode) updateTableInput := &dynamodb.UpdateTableInput{TableName: aws.String(expected.Name), BillingMode: aws.String(dynamodb.BillingModePayPerRequest)} _, err := d.DynamoDB.UpdateTableWithContext(ctx, updateTableInput) return err }) }); err != nil { - recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable") + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics) if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) } else { @@ -350,7 +353,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch if !current.Tags.Equals(expected.Tags) { var tableARN *string if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { out, err := d.DynamoDB.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(expected.Name), }) @@ -367,7 +370,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch } return d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := d.DynamoDB.TagResourceWithContext(ctx, &dynamodb.TagResourceInput{ ResourceArn: tableARN, Tags: chunkTagsToDynamoDB(expected.Tags), diff --git a/pkg/chunk/aws/fixtures.go b/pkg/chunk/aws/fixtures.go index 0d23d9ee31e..24ab8cde736 100644 --- a/pkg/chunk/aws/fixtures.go +++ b/pkg/chunk/aws/fixtures.go @@ -35,12 +35,14 @@ var Fixtures = []testutils.Fixture{ dynamoDB := newMockDynamoDB(0, 0) table := &dynamoTableClient{ DynamoDB: dynamoDB, + metrics: newMetrics(nil), } index := &dynamoDBStorageClient{ DynamoDB: dynamoDB, batchGetItemRequestFn: dynamoDB.batchGetItemRequest, batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest, schemaCfg: schemaConfig, + metrics: newMetrics(nil), } object := objectclient.NewClient(&S3ObjectClient{ S3: newMockS3(), @@ -68,6 +70,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix schemaCfg := testutils.DefaultSchemaConfig("aws") table := &dynamoTableClient{ DynamoDB: dynamoDB, + metrics: newMetrics(nil), } storage := &dynamoDBStorageClient{ cfg: DynamoDBConfig{ @@ -84,6 +87,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix batchGetItemRequestFn: dynamoDB.batchGetItemRequest, batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest, schemaCfg: schemaCfg, + metrics: newMetrics(nil), } return storage, storage, table, schemaCfg, testutils.CloserFunc(func() error { table.Stop() diff --git a/pkg/chunk/aws/metrics_autoscaling.go b/pkg/chunk/aws/metrics_autoscaling.go index 3df859cbdcf..fea098c8233 100644 --- a/pkg/chunk/aws/metrics_autoscaling.go +++ b/pkg/chunk/aws/metrics_autoscaling.go @@ -78,7 +78,7 @@ type metricsData struct { readErrorRates map[string]float64 } -func newMetrics(cfg DynamoDBConfig) (*metricsData, error) { +func newMetricsAutoScaling(cfg DynamoDBConfig) (*metricsData, error) { client, err := promApi.NewClient(promApi.Config{Address: cfg.Metrics.URL}) if err != nil { return nil, err diff --git a/pkg/chunk/aws/metrics_autoscaling_test.go b/pkg/chunk/aws/metrics_autoscaling_test.go index 3ebf8745694..992024e2776 100644 --- a/pkg/chunk/aws/metrics_autoscaling_test.go +++ b/pkg/chunk/aws/metrics_autoscaling_test.go @@ -139,6 +139,7 @@ func TestTableManagerMetricsAutoScaling(t *testing.T) { }, tableLastUpdated: make(map[string]time.Time), }, + metrics: newMetrics(nil), } indexWriteScale := fixtureWriteScale() @@ -299,6 +300,7 @@ func TestTableManagerMetricsReadAutoScaling(t *testing.T) { tableLastUpdated: make(map[string]time.Time), tableReadLastUpdated: make(map[string]time.Time), }, + metrics: newMetrics(nil), } indexReadScale := fixtureReadScale() diff --git a/pkg/chunk/cassandra/fixtures.go b/pkg/chunk/cassandra/fixtures.go index feb2df25219..b55cb16af5f 100644 --- a/pkg/chunk/cassandra/fixtures.go +++ b/pkg/chunk/cassandra/fixtures.go @@ -35,17 +35,17 @@ func (f *fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient, // Get a SchemaConfig with the defaults. schemaConfig := testutils.DefaultSchemaConfig("cassandra") - storageClient, err := NewStorageClient(cfg, schemaConfig) + storageClient, err := NewStorageClient(cfg, schemaConfig, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } - objectClient, err := NewObjectClient(cfg, schemaConfig) + objectClient, err := NewObjectClient(cfg, schemaConfig, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } - tableClient, err := NewTableClient(context.Background(), cfg) + tableClient, err := NewTableClient(context.Background(), cfg, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index c4a0fa56f29..f6fe99a449b 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -89,7 +89,7 @@ func (cfg *Config) Validate() error { return nil } -func (cfg *Config) session(name string) (*gocql.Session, error) { +func (cfg *Config) session(name string, reg prometheus.Registerer) (*gocql.Session, error) { consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency) if err != nil { return nil, errors.WithStack(err) @@ -107,7 +107,7 @@ func (cfg *Config) session(name string) (*gocql.Session, error) { cluster.NumConns = cfg.NumConnections cluster.Logger = log.With(pkgutil.Logger, "module", "gocql", "client", name) cluster.Registerer = prometheus.WrapRegistererWith( - prometheus.Labels{"client": name}, prometheus.DefaultRegisterer) + prometheus.Labels{"client": name}, reg) if cfg.Retries > 0 { cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ NumRetries: cfg.Retries, @@ -222,15 +222,15 @@ type StorageClient struct { } // NewStorageClient returns a new StorageClient. -func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient, error) { +func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*StorageClient, error) { pkgutil.WarnExperimentalUse("Cassandra Backend") - readSession, err := cfg.session("index-read") + readSession, err := cfg.session("index-read", registerer) if err != nil { return nil, errors.WithStack(err) } - writeSession, err := cfg.session("index-write") + writeSession, err := cfg.session("index-write", registerer) if err != nil { return nil, errors.WithStack(err) } @@ -407,15 +407,15 @@ type ObjectClient struct { } // NewObjectClient returns a new ObjectClient. -func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig) (*ObjectClient, error) { +func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*ObjectClient, error) { pkgutil.WarnExperimentalUse("Cassandra Backend") - readSession, err := cfg.session("chunks-read") + readSession, err := cfg.session("chunks-read", registerer) if err != nil { return nil, errors.WithStack(err) } - writeSession, err := cfg.session("chunks-write") + writeSession, err := cfg.session("chunks-write", registerer) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/chunk/cassandra/table_client.go b/pkg/chunk/cassandra/table_client.go index ee242e354c7..fc269e26409 100644 --- a/pkg/chunk/cassandra/table_client.go +++ b/pkg/chunk/cassandra/table_client.go @@ -6,6 +6,7 @@ import ( "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/chunk" ) @@ -16,8 +17,8 @@ type tableClient struct { } // NewTableClient returns a new TableClient. -func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { - session, err := cfg.session("table-manager") +func NewTableClient(ctx context.Context, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) { + session, err := cfg.session("table-manager", registerer) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 2ffc73714ad..3b268f87d26 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -49,7 +49,7 @@ type DayTime struct { // MarshalYAML implements yaml.Marshaller. func (d DayTime) MarshalYAML() (interface{}, error) { - return d.Time.Time().Format("2006-01-02"), nil + return d.String(), nil } // UnmarshalYAML implements yaml.Unmarshaller. @@ -66,6 +66,10 @@ func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } +func (d *DayTime) String() string { + return d.Time.Time().Format("2006-01-02") +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 8d87a7d34f3..1629ead9247 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -153,7 +153,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf stores := chunk.NewCompositeStore(cacheGenNumLoader) for _, s := range schemaCfg.Configs { - index, err := NewIndexClient(s.IndexType, cfg, schemaCfg) + indexClientReg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "index-store-" + s.From.String()}, reg) + + index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, indexClientReg) if err != nil { return nil, errors.Wrap(err, "error creating index client") } @@ -163,7 +166,11 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf if objectStoreType == "" { objectStoreType = s.IndexType } - chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg) + + chunkClientReg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "chunk-store-" + s.From.String()}, reg) + + chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, chunkClientReg) if err != nil { return nil, errors.Wrap(err, "error creating object client") } @@ -180,7 +187,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf } // NewIndexClient makes a new index client of the desired type. -func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { +func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { if indexClientFactory.indexClientFactoryFunc != nil { return indexClientFactory.indexClientFactoryFunc() @@ -199,7 +206,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewDynamoDBIndexClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg) + return aws.NewDynamoDBIndexClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer) case "gcp": return gcp.NewStorageClientV1(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcp-columnkey", "bigtable": @@ -208,7 +215,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun cfg.GCPStorageConfig.DistributeKeys = true return gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": - return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) + return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg, registerer) case "boltdb": return local.NewBoltDBIndexClient(cfg.BoltDBConfig) case "grpc-store": @@ -219,7 +226,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun } // NewChunkClient makes a new chunk.Client of the desired types. -func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.Client, error) { +func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (chunk.Client, error) { switch name { case "inmemory": return chunk.NewMockStorage(), nil @@ -233,7 +240,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg) + return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer) case "azure": return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, chunk.DirDelim)) case "gcp": @@ -245,7 +252,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun case "swift": return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, chunk.DirDelim)) case "cassandra": - return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg) + return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer) case "filesystem": store, err := local.NewFSObjectClient(cfg.FSConfig) if err != nil { @@ -267,7 +274,7 @@ func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client, } // NewTableClient makes a new table client based on the configuration. -func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { +func NewTableClient(name string, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { if indexClientFactory.tableClientFactoryFunc != nil { return indexClientFactory.tableClientFactoryFunc() @@ -285,11 +292,11 @@ func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewDynamoDBTableClient(cfg.AWSStorageConfig.DynamoDBConfig) + return aws.NewDynamoDBTableClient(cfg.AWSStorageConfig.DynamoDBConfig, registerer) case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed": return gcp.NewTableClient(context.Background(), cfg.GCPStorageConfig) case "cassandra": - return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig) + return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig, registerer) case "boltdb": return local.NewTableClient(cfg.BoltDBConfig.Directory) case "grpc-store": diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index 0b16dc7b2a2..e2b09c0e0a2 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -5,11 +5,14 @@ import ( "os" "reflect" "testing" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/cassandra" "github.com/cortexproject/cortex/pkg/chunk/local" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" @@ -135,7 +138,7 @@ func TestCustomIndexClient(t *testing.T) { RegisterIndexStore(tc.indexClientName, tc.indexClientFactories.indexClientFactoryFunc, tc.indexClientFactories.tableClientFactoryFunc) } - indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg) + indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, nil) if tc.errorExpected { require.Error(t, err) } else { @@ -143,7 +146,7 @@ func TestCustomIndexClient(t *testing.T) { require.Equal(t, tc.expectedIndexClientType, reflect.TypeOf(indexClient)) } - tableClient, err := NewTableClient(tc.indexClientName, cfg) + tableClient, err := NewTableClient(tc.indexClientName, cfg, nil) if tc.errorExpected { require.Error(t, err) } else { @@ -154,6 +157,45 @@ func TestCustomIndexClient(t *testing.T) { } } +func TestCassandraInMultipleSchemas(t *testing.T) { + addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES") + if addresses == "" { + return + } + + // cassandra config + var cassandraCfg cassandra.Config + flagext.DefaultValues(&cassandraCfg) + cassandraCfg.Addresses = addresses + cassandraCfg.Keyspace = "test" + cassandraCfg.Consistency = "QUORUM" + cassandraCfg.ReplicationFactor = 1 + + // build schema with cassandra in multiple periodic configs + schemaCfg := chunk.DefaultSchemaConfig("cassandra", "v1", model.Now().Add(-7*24*time.Hour)) + newSchemaCfg := schemaCfg.Configs[0] + newSchemaCfg.Schema = "v2" + newSchemaCfg.From = chunk.DayTime{Time: model.Now()} + + schemaCfg.Configs = append(schemaCfg.Configs, newSchemaCfg) + + var ( + cfg Config + storeConfig chunk.StoreConfig + defaults validation.Limits + ) + flagext.DefaultValues(&cfg, &storeConfig, &defaults) + cfg.CassandraStorageConfig = cassandraCfg + + limits, err := validation.NewOverrides(defaults, nil) + require.NoError(t, err) + + store, err := NewStore(cfg, storeConfig, schemaCfg, limits, prometheus.NewRegistry(), nil) + require.NoError(t, err) + + store.Stop() +} + // useful for cleaning up state after tests func unregisterAllCustomIndexStores() { customIndexStores = map[string]indexStoreFactories{} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index ad2dbc8dbc8..26be5bc979d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -329,7 +329,9 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { } var indexClient chunk.IndexClient - indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema) + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": DeleteRequestsStore}, prometheus.DefaultRegisterer) + indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema, reg) if err != nil { return } @@ -425,7 +427,10 @@ func (t *Cortex) initTableManager() (services.Service, error) { os.Exit(1) } - tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage) + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer) + + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, reg) if err != nil { return nil, err } @@ -435,7 +440,10 @@ func (t *Cortex) initTableManager() (services.Service, error) { var extraTables []chunk.ExtraTables if t.Cfg.PurgerConfig.Enable { - deleteStoreTableClient, err := storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage) + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "table-manager-" + DeleteRequestsStore}, prometheus.DefaultRegisterer) + + deleteStoreTableClient, err := storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, reg) if err != nil { return nil, err }