diff --git a/CHANGELOG.md b/CHANGELOG.md index 58323836fdf..054b58ef504 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * [ENHANCEMENT] Added `cortex_alertmanager_config_hash` metric to expose hash of Alertmanager Config loaded per user. #3388 * [ENHANCEMENT] Query-Frontend / Query-Scheduler: New component called "Query-Scheduler" has been introduced. Query-Scheduler is simply a queue of requests, moved outside of Query-Frontend. This allows Query-Frontend to be scaled separately from number of queues. To make Query-Frontend and Querier use Query-Scheduler, they need to be started with `-frontend.scheduler-address` and `-querier.scheduler-address` options respectively. #3374 #3471 * [ENHANCEMENT] Query-frontend / Querier / Ruler: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis and is enforced in the query-frontend, querier and ruler. #3452 #3458 -* [ENHANCEMENT] Querier: added `-querier.query-store-for-labels-enabled` to query store for series API. Only works with blocks storage engine. #3461 +* [ENHANCEMENT] Querier: added `-querier.query-store-for-labels-enabled` to query store for label names, label values and series APIs. Only works with blocks storage engine. #3461 #3520 * [ENHANCEMENT] Ingester: exposed `-blocks-storage.tsdb.wal-segment-size-bytes` config option to customise the TSDB WAL segment max size. #3476 * [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483 * [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483 diff --git a/Makefile b/Makefile index 5fda26f3279..2d6db7c32ed 100644 --- a/Makefile +++ b/Makefile @@ -87,6 +87,7 @@ pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto pkg/ruler/ruler.pb.go: pkg/ruler/rules/rules.proto pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto pkg/scheduler/schedulerpb/scheduler.pb.go: pkg/scheduler/schedulerpb/scheduler.proto +pkg/storegateway/storegatewaypb/gateway.pb.go: pkg/storegateway/storegatewaypb/gateway.proto pkg/chunk/grpc/grpc.pb.go: pkg/chunk/grpc/grpc.proto tools/blocksconvert/scheduler.pb.go: tools/blocksconvert/scheduler.proto diff --git a/docs/api/_index.md b/docs/api/_index.md index 9ce6ba83ee4..78719f77027 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -294,7 +294,7 @@ GET,POST /api/v1/labels GET,POST /api/v1/labels ``` -Get label names of ingested series. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label names from in-memory data stored in the ingesters. +Get label names of ingested series. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label names from in-memory data stored in the ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set. _For more information, please check out the Prometheus [get label names](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) documentation._ @@ -309,7 +309,7 @@ GET /api/v1/label/{name}/values GET /api/v1/label/{name}/values ``` -Get label values for a given label name. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label values from in-memory data stored in the ingesters. +Get label values for a given label name. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label values from in-memory data stored in the ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set. _For more information, please check out the Prometheus [get label values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) documentation._ diff --git a/integration/querier_test.go b/integration/querier_test.go index 1ea7fb7afe5..f177264a0d1 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -536,10 +536,10 @@ func testMetadataQueriesWithBlocksStorage( labelValuesTests: []labelValuesTest{ { label: labels.MetricName, - resp: []string{lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, + resp: []string{lastSeriesInStorageName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, }, }, - labelNames: []string{labels.MetricName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, + labelNames: []string{labels.MetricName, lastSeriesInStorageName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, }, "query metadata entirely outside the ingester range should return the head data as well": { from: lastSeriesInStorageTs.Add(-2 * blockRangePeriod), @@ -563,10 +563,10 @@ func testMetadataQueriesWithBlocksStorage( labelValuesTests: []labelValuesTest{ { label: labels.MetricName, - resp: []string{firstSeriesInIngesterHeadName}, + resp: []string{lastSeriesInStorageName, firstSeriesInIngesterHeadName}, }, }, - labelNames: []string{labels.MetricName, firstSeriesInIngesterHeadName}, + labelNames: []string{labels.MetricName, lastSeriesInStorageName, firstSeriesInIngesterHeadName}, }, } @@ -590,7 +590,7 @@ func testMetadataQueriesWithBlocksStorage( for _, val := range lvt.resp { exp = append(exp, model.LabelValue(val)) } - require.ElementsMatch(t, exp, labelsRes) + require.Equal(t, exp, labelsRes) } labelNames, err := c.LabelNames(tc.from, tc.to) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e42fdf5f428..c575851b26a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -672,6 +672,10 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode for v := range valueSet { values = append(values, v) } + + // We need the values returned to be sorted. + sort.Strings(values) + return values, nil } @@ -704,9 +708,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st for v := range valueSet { values = append(values, v) } - sort.Slice(values, func(i, j int) bool { - return values[i] < values[j] - }) + + sort.Strings(values) return values, nil } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 429e4bd22bb..93341dbd482 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sort" "strings" "sync" "time" @@ -22,6 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/strutil" "github.com/weaveworks/common/user" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -292,14 +294,73 @@ func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. return q.selectSorted(sp, matchers...) } -func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { - // Cortex doesn't use this. It will ask ingesters for metadata. - return nil, nil, errors.New("not implemented") +func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { + spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelNames") + defer spanLog.Span.Finish() + + minT, maxT := q.minT, q.maxT + + var ( + resMtx sync.Mutex + resNameSets = [][]string{} + resWarnings = storage.Warnings(nil) + ) + + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { + nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT) + if err != nil { + return nil, err + } + + resMtx.Lock() + resNameSets = append(resNameSets, nameSets...) + resWarnings = append(resWarnings, warnings...) + resMtx.Unlock() + + return queriedBlocks, nil + } + + err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(resNameSets...), resWarnings, nil } -func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { - // Cortex doesn't use this. It will ask ingesters for metadata. - return nil, nil, errors.New("not implemented") +func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelValues") + defer spanLog.Span.Finish() + + minT, maxT := q.minT, q.maxT + + var ( + resValueSets = [][]string{} + resWarnings = storage.Warnings(nil) + + resultMtx sync.Mutex + ) + + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { + valueSets, warnings, queriedBlocks, err := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT) + if err != nil { + return nil, err + } + + resultMtx.Lock() + resValueSets = append(resValueSets, valueSets...) + resWarnings = append(resWarnings, warnings...) + resultMtx.Unlock() + + return queriedBlocks, nil + } + + err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(resValueSets...), resWarnings, nil } func (q *blocksStoreQuerier) Close() error { @@ -504,7 +565,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( stream, err := c.Series(gCtx, req) if err != nil { - return errors.Wrapf(err, "failed to fetch series from %s", c) + return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } mySeries := []*storepb.Series(nil) @@ -523,7 +584,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( break } if err != nil { - return errors.Wrapf(err, "failed to receive series from %s", c) + return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) } // Response may either contain series, warning or hints. @@ -546,7 +607,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if h := resp.GetHints(); h != nil { hints := hintspb.SeriesResponseHints{} if err := types.UnmarshalAny(h, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal hints from %s", c) + return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) @@ -559,7 +620,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } level.Debug(spanLog).Log("msg", "received series from store-gateway", - "instance", c, + "instance", c.RemoteAddress(), "num series", len(mySeries), "bytes series", countSeriesBytes(mySeries), "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), @@ -584,6 +645,160 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil } +func (q *blocksStoreQuerier) fetchLabelNamesFromStore( + ctx context.Context, + clients map[BlocksStoreClient][]ulid.ULID, + minT int64, + maxT int64, +) ([][]string, storage.Warnings, []ulid.ULID, error) { + var ( + reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) + g, gCtx = errgroup.WithContext(reqCtx) + mtx = sync.Mutex{} + nameSets = [][]string{} + warnings = storage.Warnings(nil) + queriedBlocks = []ulid.ULID(nil) + spanLog = spanlogger.FromContext(ctx) + ) + + // Concurrently fetch series from all clients. + for c, blockIDs := range clients { + // Change variables scope since it will be used in a goroutine. + c := c + blockIDs := blockIDs + + g.Go(func() error { + req, err := createLabelNamesRequest(minT, maxT, blockIDs) + if err != nil { + return errors.Wrapf(err, "failed to create label names request") + } + + namesResp, err := c.LabelNames(gCtx, req) + if err != nil { + return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) + } + + myQueriedBlocks := []ulid.ULID(nil) + if namesResp.Hints != nil { + hints := hintspb.LabelNamesResponseHints{} + if err := types.UnmarshalAny(namesResp.Hints, &hints); err != nil { + return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c.RemoteAddress()) + } + + ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) + if err != nil { + return errors.Wrapf(err, "failed to parse queried block IDs from received hints") + } + + myQueriedBlocks = ids + } + + level.Debug(spanLog).Log("msg", "received label names from store-gateway", + "instance", c, + "num labels", len(namesResp.Names), + "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + + // Store the result. + mtx.Lock() + nameSets = append(nameSets, namesResp.Names) + for _, w := range namesResp.Warnings { + warnings = append(warnings, errors.New(w)) + } + queriedBlocks = append(queriedBlocks, myQueriedBlocks...) + mtx.Unlock() + + return nil + }) + } + + // Wait until all client requests complete. + if err := g.Wait(); err != nil { + return nil, nil, nil, err + } + + return nameSets, warnings, queriedBlocks, nil +} + +func (q *blocksStoreQuerier) fetchLabelValuesFromStore( + ctx context.Context, + name string, + clients map[BlocksStoreClient][]ulid.ULID, + minT int64, + maxT int64, +) ([][]string, storage.Warnings, []ulid.ULID, error) { + var ( + reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) + g, gCtx = errgroup.WithContext(reqCtx) + mtx = sync.Mutex{} + valueSets = [][]string{} + warnings = storage.Warnings(nil) + queriedBlocks = []ulid.ULID(nil) + spanLog = spanlogger.FromContext(ctx) + ) + + // Concurrently fetch series from all clients. + for c, blockIDs := range clients { + // Change variables scope since it will be used in a goroutine. + c := c + blockIDs := blockIDs + + g.Go(func() error { + req, err := createLabelValuesRequest(minT, maxT, name, blockIDs) + if err != nil { + return errors.Wrapf(err, "failed to create label values request") + } + + valuesResp, err := c.LabelValues(gCtx, req) + if err != nil { + return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) + } + + myQueriedBlocks := []ulid.ULID(nil) + if valuesResp.Hints != nil { + hints := hintspb.LabelValuesResponseHints{} + if err := types.UnmarshalAny(valuesResp.Hints, &hints); err != nil { + return errors.Wrapf(err, "failed to unmarshal label values hints from %s", c.RemoteAddress()) + } + + ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) + if err != nil { + return errors.Wrapf(err, "failed to parse queried block IDs from received hints") + } + + myQueriedBlocks = ids + } + + level.Debug(spanLog).Log("msg", "received label values from store-gateway", + "instance", c.RemoteAddress(), + "num values", len(valuesResp.Values), + "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + + // Values returned need not be sorted, but we need them to be sorted so we can merge. + sort.Strings(valuesResp.Values) + + // Store the result. + mtx.Lock() + valueSets = append(valueSets, valuesResp.Values) + for _, w := range valuesResp.Warnings { + warnings = append(warnings, errors.New(w)) + } + queriedBlocks = append(queriedBlocks, myQueriedBlocks...) + mtx.Unlock() + + return nil + }) + } + + // Wait until all client requests complete. + if err := g.Wait(); err != nil { + return nil, nil, nil, err + } + + return valueSets, warnings, queriedBlocks, nil +} + func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) { // Selectively query only specific blocks. hints := &hintspb.SeriesRequestHints{ @@ -598,7 +813,7 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip anyHints, err := types.MarshalAny(hints) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal request hints") + return nil, errors.Wrapf(err, "failed to marshal series request hints") } return &storepb.SeriesRequest{ @@ -611,6 +826,61 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip }, nil } +func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID) (*storepb.LabelNamesRequest, error) { + req := &storepb.LabelNamesRequest{ + Start: minT, + End: maxT, + } + + // Selectively query only specific blocks. + hints := &hintspb.LabelNamesRequestHints{ + BlockMatchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: block.BlockIDLabel, + Value: strings.Join(convertULIDsToString(blockIDs), "|"), + }, + }, + } + + anyHints, err := types.MarshalAny(hints) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal label names request hints") + } + + req.Hints = anyHints + + return req, nil +} + +func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.ULID) (*storepb.LabelValuesRequest, error) { + req := &storepb.LabelValuesRequest{ + Start: minT, + End: maxT, + Label: label, + } + + // Selectively query only specific blocks. + hints := &hintspb.LabelValuesRequestHints{ + BlockMatchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: block.BlockIDLabel, + Value: strings.Join(convertULIDsToString(blockIDs), "|"), + }, + }, + } + + anyHints, err := types.MarshalAny(hints) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal label values request hints") + } + + req.Hints = anyHints + + return req, nil +} + func convertULIDsToString(ids []ulid.ULID) []string { res := make([]string, len(ids)) for idx, id := range ids { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a4be09c8840..52587ddf9ca 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sort" "strings" "testing" "time" @@ -98,7 +99,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block1, block2), @@ -123,7 +124,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 3), @@ -154,11 +155,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block2), }}: {block2}, @@ -182,11 +183,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block2), @@ -211,17 +212,17 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 1), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block2), }}: {block2}, - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), mockHintsResponse(block3), @@ -280,7 +281,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block1), @@ -302,11 +303,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block2), }}: {block2}, @@ -327,25 +328,25 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockHintsResponse(block1), }}: {block1, block3}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2), mockHintsResponse(block2), }}: {block2, block4}, }, // Second attempt returns 1 missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block3), }}: {block3, block4}, }, // Third attempt returns the last missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), mockHintsResponse(block4), }}: {block4}, @@ -402,7 +403,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block1, block2), @@ -427,7 +428,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block1, block2), @@ -447,25 +448,25 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockHintsResponse(block1), }}: {block1, block3}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2), mockHintsResponse(block2), }}: {block2, block4}, }, // Second attempt returns 1 missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block3), }}: {block3, block4}, }, // Third attempt returns the last missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), mockHintsResponse(block4), }}: {block4}, @@ -544,6 +545,474 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { } } +func TestBlocksStoreQuerier_Labels(t *testing.T) { + const ( + metricName = "test_metric" + minT = int64(10) + maxT = int64(20) + ) + + var ( + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + series1 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": "1", + }) + series2 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_2", + "series2": "1", + }) + ) + + tests := map[string]struct { + finderResult []*BlockMeta + finderErr error + storeSetResponses []interface{} + expectedLabelNames []string + expectedLabelValues []string // For __name__ + expectedErr string + expectedMetrics string + }{ + "no block in the storage matching the query time range": { + finderResult: nil, + expectedErr: "", + }, + "error while finding blocks matching the query time range": { + finderErr: errors.New("unable to find blocks"), + expectedErr: "unable to find blocks", + }, + "error while getting clients to query the store-gateway": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + errors.New("no client found"), + }, + expectedErr: "no client found", + }, + "a single store-gateway instance holds the required blocks": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block1, block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2), + Warnings: []string{}, + Hints: mockValuesHints(block1, block2), + }, + }: {block1, block2}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), + }, + "multiple store-gateway instances holds the required blocks without overlapping series": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), + }, + "multiple store-gateway instances holds the required blocks with overlapping series (single returned series)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, + }, + }, + expectedLabelNames: namesFromSeries(series1), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1), + }, + "multiple store-gateway instances holds the required blocks with overlapping series (multiple returned series)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + // Block1 has series1 and series2 + // Block2 has only series1 + // Block3 has only series2 + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, + &storeGatewayClientMock{ + remoteAddr: "3.3.3.3", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block3), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block3), + }, + }: {block3}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), + expectedMetrics: ` + # HELP cortex_querier_storegateway_instances_hit_per_query Number of store-gateway instances hit for a single query. + # TYPE cortex_querier_storegateway_instances_hit_per_query histogram + cortex_querier_storegateway_instances_hit_per_query_bucket{le="0"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="1"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="2"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="3"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="4"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="5"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="6"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="7"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="8"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="9"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="10"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_instances_hit_per_query_sum 3 + cortex_querier_storegateway_instances_hit_per_query_count 1 + + # HELP cortex_querier_storegateway_refetches_per_query Number of re-fetches attempted while querying store-gateway instances due to missing blocks. + # TYPE cortex_querier_storegateway_refetches_per_query histogram + cortex_querier_storegateway_refetches_per_query_bucket{le="0"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="1"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="2"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_refetches_per_query_sum 0 + cortex_querier_storegateway_refetches_per_query_count 1 + `, + }, + "a single store-gateway instance has some missing blocks (consistency check failed)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + }, + // Second attempt returns an error because there are no other store-gateways left. + errors.New("no store-gateway remaining after exclude"), + }, + expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s", block2.String()), + }, + "multiple store-gateway instances have some missing blocks (consistency check failed)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, + }, + // Second attempt returns an error because there are no other store-gateways left. + errors.New("no store-gateway remaining after exclude"), + }, + expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), + }, + "multiple store-gateway instances have some missing blocks but queried from a replica during subsequent attempts": { + // Block1 has series1 + // Block2 has series2 + // Block3 has series1 and series2 + // Block4 has no series (poor lonely block) + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1, block3}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2, block4}, + }, + // Second attempt returns 1 missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "3.3.3.3", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block3), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2), + Warnings: []string{}, + Hints: mockValuesHints(block3), + }, + }: {block3, block4}, + }, + // Third attempt returns the last missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "4.4.4.4", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: []string{}, + Warnings: []string{}, + Hints: mockNamesHints(block4), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: []string{}, + Warnings: []string{}, + Hints: mockValuesHints(block4), + }, + }: {block4}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), + expectedMetrics: ` + # HELP cortex_querier_storegateway_instances_hit_per_query Number of store-gateway instances hit for a single query. + # TYPE cortex_querier_storegateway_instances_hit_per_query histogram + cortex_querier_storegateway_instances_hit_per_query_bucket{le="0"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="1"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="2"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="3"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="4"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="5"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="6"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="7"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="8"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="9"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="10"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_instances_hit_per_query_sum 4 + cortex_querier_storegateway_instances_hit_per_query_count 1 + + # HELP cortex_querier_storegateway_refetches_per_query Number of re-fetches attempted while querying store-gateway instances due to missing blocks. + # TYPE cortex_querier_storegateway_refetches_per_query histogram + cortex_querier_storegateway_refetches_per_query_bucket{le="0"} 0 + cortex_querier_storegateway_refetches_per_query_bucket{le="1"} 0 + cortex_querier_storegateway_refetches_per_query_bucket{le="2"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_refetches_per_query_sum 2 + cortex_querier_storegateway_refetches_per_query_count 1 + `, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Splitting it because we need a new registry for names and values. + // And also the initial expectedErr checking needs to be done for both. + for _, testFunc := range []string{"LabelNames", "LabelValues"} { + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} + finder := &blocksFinderMock{} + finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr) + + q := &blocksStoreQuerier{ + ctx: ctx, + minT: minT, + maxT: maxT, + userID: "user-1", + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(reg), + limits: &blocksStoreLimitsMock{}, + } + + if testFunc == "LabelNames" { + names, warnings, err := q.LabelNames() + if testData.expectedErr != "" { + require.Equal(t, testData.expectedErr, err.Error()) + continue + } + + require.NoError(t, err) + require.Equal(t, 0, len(warnings)) + require.Equal(t, testData.expectedLabelNames, names) + + // Assert on metrics (optional, only for test cases defining it). + if testData.expectedMetrics != "" { + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) + } + } + + if testFunc == "LabelValues" { + values, warnings, err := q.LabelValues(labels.MetricName) + if testData.expectedErr != "" { + require.Equal(t, testData.expectedErr, err.Error()) + continue + } + + require.NoError(t, err) + require.Equal(t, 0, len(warnings)) + require.Equal(t, testData.expectedLabelValues, values) + + // Assert on metrics (optional, only for test cases defining it). + if testData.expectedMetrics != "" { + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) + } + } + } + }) + } +} + func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) { now := time.Now() @@ -656,7 +1125,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { }, map[ulid.ULID]*metadata.DeletionMark(nil), error(nil)) // Mock the store to simulate each block is queried from a different store-gateway. - gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ { Result: &storepb.SeriesResponse_Series{ Series: &storepb.Series{ @@ -679,7 +1148,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { mockHintsResponse(block1), }} - gateway2 := &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + gateway2 := &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ { Result: &storepb.SeriesResponse_Series{ Series: &storepb.Series{ @@ -780,18 +1249,28 @@ func (m *blocksFinderMock) GetBlocks(userID string, minT, maxT int64) ([]*BlockM } type storeGatewayClientMock struct { - remoteAddr string - mockedResponses []*storepb.SeriesResponse + remoteAddr string + mockedSeriesResponses []*storepb.SeriesResponse + mockedLabelNamesResponse *storepb.LabelNamesResponse + mockedLabelValuesResponse *storepb.LabelValuesResponse } func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) { seriesClient := &storeGatewaySeriesClientMock{ - mockedResponses: m.mockedResponses, + mockedResponses: m.mockedSeriesResponses, } return seriesClient, nil } +func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNamesRequest, ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + return m.mockedLabelNamesResponse, nil +} + +func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + return m.mockedLabelValuesResponse, nil +} + func (m *storeGatewayClientMock) RemoteAddress() string { return m.remoteAddr } @@ -867,3 +1346,67 @@ func mockHintsResponse(ids ...ulid.ULID) *storepb.SeriesResponse { }, } } + +func mockNamesHints(ids ...ulid.ULID) *types.Any { + hints := &hintspb.LabelNamesResponseHints{} + for _, id := range ids { + hints.AddQueriedBlock(id) + } + + any, err := types.MarshalAny(hints) + if err != nil { + panic(err) + } + + return any +} + +func mockValuesHints(ids ...ulid.ULID) *types.Any { + hints := &hintspb.LabelValuesResponseHints{} + for _, id := range ids { + hints.AddQueriedBlock(id) + } + + any, err := types.MarshalAny(hints) + if err != nil { + panic(err) + } + + return any +} + +func namesFromSeries(series ...labels.Labels) []string { + namesMap := map[string]struct{}{} + for _, s := range series { + for _, l := range s { + namesMap[l.Name] = struct{}{} + } + } + + names := []string{} + for name := range namesMap { + names = append(names, name) + } + + sort.Strings(names) + return names +} + +func valuesFromSeries(name string, series ...labels.Labels) []string { + valuesMap := map[string]struct{}{} + for _, s := range series { + for _, l := range s { + if l.Name == name { + valuesMap[l.Value] = struct{}{} + } + } + } + + values := []string{} + for name := range valuesMap { + values = append(values, name) + } + + sort.Strings(values) + return values +} diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 4e08b02d21a..0f413bb415b 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -183,8 +183,9 @@ func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*label } func (q *distributorQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { - lv, err := q.distributor.LabelValuesForLabelName(q.ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name)) - return lv, nil, err + lvs, err := q.distributor.LabelValuesForLabelName(q.ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name)) + + return lvs, nil, err } func (q *distributorQuerier) LabelNames() ([]string, storage.Warnings, error) { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7fa7042787e..51f3923758b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "strings" + "sync" "time" "github.com/go-kit/kit/log/level" @@ -14,7 +15,9 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/strutil" "github.com/weaveworks/common/user" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/purger" @@ -373,11 +376,91 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat // LabelsValue implements storage.Querier. func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { - return q.metadataQuerier.LabelValues(name) + if !q.queryStoreForLabels { + return q.metadataQuerier.LabelValues(name) + } + + if len(q.queriers) == 1 { + return q.queriers[0].LabelValues(name) + } + + var ( + g, _ = errgroup.WithContext(q.ctx) + sets = [][]string{} + warnings = storage.Warnings(nil) + + resMtx sync.Mutex + ) + + for _, querier := range q.queriers { + // Need to reassign as the original variable will change and can't be relied on in a goroutine. + querier := querier + g.Go(func() error { + // NB: Values are sorted in Cortex already. + myValues, myWarnings, err := querier.LabelValues(name) + if err != nil { + return err + } + + resMtx.Lock() + sets = append(sets, myValues) + warnings = append(warnings, myWarnings...) + resMtx.Unlock() + + return nil + }) + } + + err := g.Wait() + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(sets...), warnings, nil } func (q querier) LabelNames() ([]string, storage.Warnings, error) { - return q.metadataQuerier.LabelNames() + if !q.queryStoreForLabels { + return q.metadataQuerier.LabelNames() + } + + if len(q.queriers) == 1 { + return q.queriers[0].LabelNames() + } + + var ( + g, _ = errgroup.WithContext(q.ctx) + sets = [][]string{} + warnings = storage.Warnings(nil) + + resMtx sync.Mutex + ) + + for _, querier := range q.queriers { + // Need to reassign as the original variable will change and can't be relied on in a goroutine. + querier := querier + g.Go(func() error { + // NB: Names are sorted in Cortex already. + myNames, myWarnings, err := querier.LabelNames() + if err != nil { + return err + } + + resMtx.Lock() + sets = append(sets, myNames) + warnings = append(warnings, myWarnings...) + resMtx.Unlock() + + return nil + }) + } + + err := g.Wait() + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(sets...), warnings, nil } func (querier) Close() error { diff --git a/pkg/querier/store_gateway_client_test.go b/pkg/querier/store_gateway_client_test.go index 58d26f34a4d..370e121effe 100644 --- a/pkg/querier/store_gateway_client_test.go +++ b/pkg/querier/store_gateway_client_test.go @@ -74,3 +74,11 @@ type mockStoreGatewayServer struct{} func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error { return nil } + +func (m *mockStoreGatewayServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return nil, nil +} + +func (m *mockStoreGatewayServer) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return nil, nil +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 2b5dbb1fa69..e98d442d1a0 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -247,6 +247,42 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri }) } +// LabelNames implements the Storegateway proto service. +func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") + defer spanLog.Span.Finish() + + userID := getUserIDFromGRPCContext(spanCtx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + store := u.getStore(userID) + if store == nil { + return &storepb.LabelNamesResponse{}, nil + } + + return store.LabelNames(ctx, req) +} + +// LabelValues implements the Storegateway proto service. +func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues") + defer spanLog.Span.Finish() + + userID := getUserIDFromGRPCContext(spanCtx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + store := u.getStore(userID) + if store == nil { + return &storepb.LabelValuesResponse{}, nil + } + + return store.LabelValues(ctx, req) +} + // scanUsers in the bucket and return the list of found users. If an error occurs while // iterating the bucket, it may return both an error and a subset of the users in the bucket. func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 8fa8cf01b79..fe6bff4f6c9 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -319,6 +319,16 @@ func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.Sto return g.stores.Series(req, srv) } +// LabelNames implements the Storegateway proto service. +func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return g.stores.LabelNames(ctx, req) +} + +// LabelValues implements the Storegateway proto service. +func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return g.stores.LabelValues(ctx, req) +} + func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens) { // When we initialize the store-gateway instance in the ring we want to start from // a clean situation, so whatever is the state we set it JOINING, while we keep existing diff --git a/pkg/storegateway/storegatewaypb/gateway.pb.go b/pkg/storegateway/storegatewaypb/gateway.pb.go index 95c4c897f0a..fa5913faf44 100644 --- a/pkg/storegateway/storegatewaypb/gateway.pb.go +++ b/pkg/storegateway/storegatewaypb/gateway.pb.go @@ -28,20 +28,24 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package func init() { proto.RegisterFile("gateway.proto", fileDescriptor_f1a937782ebbded5) } var fileDescriptor_f1a937782ebbded5 = []byte{ - // 204 bytes of a gzipped FileDescriptorProto + // 257 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0x4f, 0x2c, 0x49, 0x2d, 0x4f, 0xac, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x72, 0x0b, 0x92, 0xa4, 0xcc, 0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0x4b, 0x32, 0x12, 0xf3, 0xf2, 0x8b, 0x75, 0x33, 0xf3, 0xa1, 0x2c, 0xfd, 0x82, 0xec, 0x74, 0xfd, 0xe2, 0x92, 0xfc, 0xa2, - 0x54, 0x08, 0x59, 0x90, 0xa4, 0x5f, 0x54, 0x90, 0x0c, 0x31, 0xc3, 0xc8, 0x93, 0x8b, 0x27, 0x18, - 0x24, 0xe8, 0x0e, 0x31, 0x4a, 0xc8, 0x92, 0x8b, 0x2d, 0x38, 0xb5, 0x28, 0x33, 0xb5, 0x58, 0x48, - 0x54, 0x0f, 0xa2, 0x5d, 0x0f, 0xc2, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x91, 0x12, 0x43, - 0x17, 0x2e, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x35, 0x60, 0x74, 0x72, 0xb9, 0xf0, 0x50, 0x8e, 0xe1, - 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, 0x78, 0x24, 0xc7, - 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0xbe, 0x78, 0x24, - 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, 0x78, - 0x2c, 0xc7, 0x10, 0xc5, 0x07, 0x76, 0x13, 0xdc, 0x27, 0x49, 0x6c, 0x60, 0x77, 0x19, 0x03, 0x02, - 0x00, 0x00, 0xff, 0xff, 0xc5, 0x38, 0xd0, 0xf6, 0xec, 0x00, 0x00, 0x00, + 0x54, 0x08, 0x59, 0x90, 0xa4, 0x5f, 0x54, 0x90, 0x0c, 0x31, 0xc3, 0xe8, 0x1a, 0x23, 0x17, 0x4f, + 0x30, 0x48, 0xd4, 0x1d, 0x62, 0x96, 0x90, 0x25, 0x17, 0x5b, 0x70, 0x6a, 0x51, 0x66, 0x6a, 0xb1, + 0x90, 0xa8, 0x1e, 0x44, 0xbf, 0x1e, 0x84, 0x1f, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x22, 0x25, + 0x86, 0x2e, 0x5c, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x6a, 0xc0, 0x28, 0xe4, 0xcc, 0xc5, 0xe5, 0x93, + 0x98, 0x94, 0x9a, 0xe3, 0x97, 0x98, 0x9b, 0x5a, 0x2c, 0x24, 0x09, 0x53, 0x87, 0x10, 0x83, 0x19, + 0x21, 0x85, 0x4d, 0x0a, 0x62, 0x8c, 0x90, 0x1b, 0x17, 0x37, 0x58, 0x34, 0x2c, 0x31, 0xa7, 0x34, + 0xb5, 0x58, 0x08, 0x55, 0x29, 0x44, 0x10, 0x66, 0x8c, 0x34, 0x56, 0x39, 0x88, 0x39, 0x4e, 0x2e, + 0x17, 0x1e, 0xca, 0x31, 0xdc, 0x78, 0x28, 0xc7, 0xf0, 0xe1, 0xa1, 0x1c, 0x63, 0xc3, 0x23, 0x39, + 0xc6, 0x15, 0x8f, 0xe4, 0x18, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, + 0x39, 0xc6, 0x17, 0x8f, 0xe4, 0x18, 0x3e, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, + 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xf8, 0xc0, 0x21, 0x04, 0x0f, 0xd7, 0x24, 0x36, + 0x70, 0x28, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x1b, 0xec, 0xe6, 0x0a, 0x7a, 0x01, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -64,6 +68,10 @@ type StoreGatewayClient interface { // // Series are sorted. Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (StoreGateway_SeriesClient, error) + // LabelNames returns all label names that is available. + LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) + // LabelValues returns all label values for given label name. + LabelValues(ctx context.Context, in *storepb.LabelValuesRequest, opts ...grpc.CallOption) (*storepb.LabelValuesResponse, error) } type storeGatewayClient struct { @@ -106,6 +114,24 @@ func (x *storeGatewaySeriesClient) Recv() (*storepb.SeriesResponse, error) { return m, nil } +func (c *storeGatewayClient) LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + out := new(storepb.LabelNamesResponse) + err := c.cc.Invoke(ctx, "/gatewaypb.StoreGateway/LabelNames", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeGatewayClient) LabelValues(ctx context.Context, in *storepb.LabelValuesRequest, opts ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + out := new(storepb.LabelValuesResponse) + err := c.cc.Invoke(ctx, "/gatewaypb.StoreGateway/LabelValues", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // StoreGatewayServer is the server API for StoreGateway service. type StoreGatewayServer interface { // Series streams each Series for given label matchers and time range. @@ -116,6 +142,10 @@ type StoreGatewayServer interface { // // Series are sorted. Series(*storepb.SeriesRequest, StoreGateway_SeriesServer) error + // LabelNames returns all label names that is available. + LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) + // LabelValues returns all label values for given label name. + LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) } // UnimplementedStoreGatewayServer can be embedded to have forward compatible implementations. @@ -125,6 +155,12 @@ type UnimplementedStoreGatewayServer struct { func (*UnimplementedStoreGatewayServer) Series(req *storepb.SeriesRequest, srv StoreGateway_SeriesServer) error { return status.Errorf(codes.Unimplemented, "method Series not implemented") } +func (*UnimplementedStoreGatewayServer) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LabelNames not implemented") +} +func (*UnimplementedStoreGatewayServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LabelValues not implemented") +} func RegisterStoreGatewayServer(s *grpc.Server, srv StoreGatewayServer) { s.RegisterService(&_StoreGateway_serviceDesc, srv) @@ -151,10 +187,55 @@ func (x *storeGatewaySeriesServer) Send(m *storepb.SeriesResponse) error { return x.ServerStream.SendMsg(m) } +func _StoreGateway_LabelNames_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(storepb.LabelNamesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreGatewayServer).LabelNames(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gatewaypb.StoreGateway/LabelNames", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreGatewayServer).LabelNames(ctx, req.(*storepb.LabelNamesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StoreGateway_LabelValues_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(storepb.LabelValuesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreGatewayServer).LabelValues(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gatewaypb.StoreGateway/LabelValues", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreGatewayServer).LabelValues(ctx, req.(*storepb.LabelValuesRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _StoreGateway_serviceDesc = grpc.ServiceDesc{ ServiceName: "gatewaypb.StoreGateway", HandlerType: (*StoreGatewayServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "LabelNames", + Handler: _StoreGateway_LabelNames_Handler, + }, + { + MethodName: "LabelValues", + Handler: _StoreGateway_LabelValues_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "Series", diff --git a/pkg/storegateway/storegatewaypb/gateway.proto b/pkg/storegateway/storegatewaypb/gateway.proto index fdde78fe87e..14e65859c27 100644 --- a/pkg/storegateway/storegatewaypb/gateway.proto +++ b/pkg/storegateway/storegatewaypb/gateway.proto @@ -14,4 +14,10 @@ service StoreGateway { // // Series are sorted. rpc Series(thanos.SeriesRequest) returns (stream thanos.SeriesResponse); + + // LabelNames returns all label names that is available. + rpc LabelNames(thanos.LabelNamesRequest) returns (thanos.LabelNamesResponse); + + // LabelValues returns all label values for given label name. + rpc LabelValues(thanos.LabelValuesRequest) returns (thanos.LabelValuesResponse); }