From 7616bcf23dd17ff8d8f434556d70352680728c92 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 20 Nov 2020 13:15:16 +0100 Subject: [PATCH 1/5] Query Label Names from blocks store Signed-off-by: Goutham Veeramachaneni --- Makefile | 1 + integration/querier_test.go | 4 +- pkg/querier/blocks_store_queryable.go | 175 +++++++++++++++++- pkg/querier/blocks_store_queryable_test.go | 8 + pkg/querier/querier.go | 46 ++++- pkg/querier/store_gateway_client_test.go | 8 + pkg/storegateway/bucket_stores.go | 36 ++++ pkg/storegateway/gateway.go | 10 + pkg/storegateway/storegatewaypb/gateway.pb.go | 63 +++++-- pkg/storegateway/storegatewaypb/gateway.proto | 6 + 10 files changed, 340 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index 5fda26f3279..2d6db7c32ed 100644 --- a/Makefile +++ b/Makefile @@ -87,6 +87,7 @@ pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto pkg/ruler/ruler.pb.go: pkg/ruler/rules/rules.proto pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto pkg/scheduler/schedulerpb/scheduler.pb.go: pkg/scheduler/schedulerpb/scheduler.proto +pkg/storegateway/storegatewaypb/gateway.pb.go: pkg/storegateway/storegatewaypb/gateway.proto pkg/chunk/grpc/grpc.pb.go: pkg/chunk/grpc/grpc.proto tools/blocksconvert/scheduler.pb.go: tools/blocksconvert/scheduler.proto diff --git a/integration/querier_test.go b/integration/querier_test.go index 1ea7fb7afe5..e7c9cf846f7 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -539,7 +539,7 @@ func testMetadataQueriesWithBlocksStorage( resp: []string{lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, }, }, - labelNames: []string{labels.MetricName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, + labelNames: []string{labels.MetricName, lastSeriesInStorageName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, }, "query metadata entirely outside the ingester range should return the head data as well": { from: lastSeriesInStorageTs.Add(-2 * blockRangePeriod), @@ -566,7 +566,7 @@ func testMetadataQueriesWithBlocksStorage( resp: []string{firstSeriesInIngesterHeadName}, }, }, - labelNames: []string{labels.MetricName, firstSeriesInIngesterHeadName}, + labelNames: []string{labels.MetricName, lastSeriesInStorageName, firstSeriesInIngesterHeadName}, }, } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 429e4bd22bb..24ca0c90c15 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/strutil" "github.com/weaveworks/common/user" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -298,8 +299,38 @@ func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warning } func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { - // Cortex doesn't use this. It will ask ingesters for metadata. - return nil, nil, errors.New("not implemented") + spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelNames") + defer spanLog.Span.Finish() + + minT, maxT := q.minT, q.maxT + + var ( + resNameSets = [][]string{} + resWarnings = storage.Warnings(nil) + + resultMtx sync.Mutex + ) + + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { + nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT) + if err != nil { + return nil, err + } + + resultMtx.Lock() + resNameSets = append(resNameSets, nameSets...) + resWarnings = append(resWarnings, warnings...) + resultMtx.Unlock() + + return queriedBlocks, nil + } + + err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(resNameSets...), resWarnings, nil } func (q *blocksStoreQuerier) Close() error { @@ -546,7 +577,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if h := resp.GetHints(); h != nil { hints := hintspb.SeriesResponseHints{} if err := types.UnmarshalAny(h, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal hints from %s", c) + return errors.Wrapf(err, "failed to unmarshal series hints from %s", c) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) @@ -584,6 +615,81 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil } +func (q *blocksStoreQuerier) fetchLabelNamesFromStore( + ctx context.Context, + clients map[BlocksStoreClient][]ulid.ULID, + minT int64, + maxT int64, +) ([][]string, storage.Warnings, []ulid.ULID, error) { + var ( + reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) + g, gCtx = errgroup.WithContext(reqCtx) + mtx = sync.Mutex{} + nameSets = [][]string{} + warnings = storage.Warnings(nil) + queriedBlocks = []ulid.ULID(nil) + spanLog = spanlogger.FromContext(ctx) + ) + + // Concurrently fetch series from all clients. + for c, blockIDs := range clients { + // Change variables scope since it will be used in a goroutine. + c := c + blockIDs := blockIDs + + g.Go(func() error { + req, err := createLabelNamesRequest(minT, maxT, blockIDs) + if err != nil { + return errors.Wrapf(err, "failed to create label names request") + } + + namesResp, err := c.LabelNames(gCtx, req) + if err != nil { + return errors.Wrapf(err, "failed to fetch series from %s", c) + } + + myQueriedBlocks := []ulid.ULID(nil) + if namesResp.Hints != nil { + hints := hintspb.LabelNamesResponseHints{} + if err := types.UnmarshalAny(namesResp.Hints, &hints); err != nil { + return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c) + } + + ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) + if err != nil { + return errors.Wrapf(err, "failed to parse queried block IDs from received hints") + } + + myQueriedBlocks = ids + } + + level.Debug(spanLog).Log("msg", "received label names from store-gateway", + "instance", c, + "num labels", len(namesResp.Names), + "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + + // Store the result. + mtx.Lock() + nameSets = append(nameSets, namesResp.Names) + for _, w := range namesResp.Warnings { + warnings = append(warnings, errors.New(w)) + } + queriedBlocks = append(queriedBlocks, myQueriedBlocks...) + mtx.Unlock() + + return nil + }) + } + + // Wait until all client requests complete. + if err := g.Wait(); err != nil { + return nil, nil, nil, err + } + + return nameSets, warnings, queriedBlocks, nil +} + func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) { // Selectively query only specific blocks. hints := &hintspb.SeriesRequestHints{ @@ -611,6 +717,69 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip }, nil } +func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID) (*storepb.LabelNamesRequest, error) { + req := &storepb.LabelNamesRequest{ + Start: minT, + End: maxT, + } + + if len(blockIDs) == 0 { + return req, nil + } + + // Selectively query only specific blocks. + hints := &hintspb.LabelNamesRequestHints{ + BlockMatchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: block.BlockIDLabel, + Value: strings.Join(convertULIDsToString(blockIDs), "|"), + }, + }, + } + + anyHints, err := types.MarshalAny(hints) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal request hints") + } + + req.Hints = anyHints + + return req, nil +} + +func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.ULID) (*storepb.LabelValuesRequest, error) { + req := &storepb.LabelValuesRequest{ + Start: minT, + End: maxT, + Label: label, + } + + if len(blockIDs) == 0 { + return req, nil + } + + // Selectively query only specific blocks. + hints := &hintspb.LabelValuesRequestHints{ + BlockMatchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: block.BlockIDLabel, + Value: strings.Join(convertULIDsToString(blockIDs), "|"), + }, + }, + } + + anyHints, err := types.MarshalAny(hints) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal request hints") + } + + req.Hints = anyHints + + return req, nil +} + func convertULIDsToString(ids []ulid.ULID) []string { res := make([]string, len(ids)) for idx, id := range ids { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a4be09c8840..87bf3e24d6d 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -792,6 +792,14 @@ func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesR return seriesClient, nil } +func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNamesRequest, ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + return nil, nil +} + +// func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { +// return nil, nil +// } + func (m *storeGatewayClientMock) RemoteAddress() string { return m.remoteAddr } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7fa7042787e..669b0991c3e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "strings" + "sync" "time" "github.com/go-kit/kit/log/level" @@ -14,7 +15,9 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/strutil" "github.com/weaveworks/common/user" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/purger" @@ -377,7 +380,48 @@ func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { } func (q querier) LabelNames() ([]string, storage.Warnings, error) { - return q.metadataQuerier.LabelNames() + if !q.queryStoreForLabels { + return q.metadataQuerier.LabelNames() + } + + if len(q.queriers) == 1 { + return q.queriers[0].LabelNames() + } + + // Using an errgroup here instead of channels, etc because this + // is a better model imo and we should move to this everywhere. + var ( + g, _ = errgroup.WithContext(q.ctx) + sets = [][]string{} + warnings = storage.Warnings(nil) + + resMtx sync.Mutex + ) + + for _, querier := range q.queriers { + // Need to reassign as the original variable will change and can't be relied on in a goroutine. + querier := querier + g.Go(func() error { + myNames, myWarnings, err := querier.LabelNames() + if err != nil { + return err + } + + resMtx.Lock() + sets = append(sets, myNames) + warnings = append(warnings, myWarnings...) + resMtx.Unlock() + + return nil + }) + } + + err := g.Wait() + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(sets...), warnings, nil } func (querier) Close() error { diff --git a/pkg/querier/store_gateway_client_test.go b/pkg/querier/store_gateway_client_test.go index 58d26f34a4d..f6f9bfd908b 100644 --- a/pkg/querier/store_gateway_client_test.go +++ b/pkg/querier/store_gateway_client_test.go @@ -74,3 +74,11 @@ type mockStoreGatewayServer struct{} func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error { return nil } + +func (m *mockStoreGatewayServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return nil, nil +} + +// func (m *mockStoreGatewayServer) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { +// return nil, nil +// } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 2b5dbb1fa69..d291eb52017 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -247,6 +247,42 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri }) } +// LabelNames implements the Storegateway proto service. +func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") + defer spanLog.Span.Finish() + + userID := getUserIDFromGRPCContext(spanCtx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + store := u.getStore(userID) + if store == nil { + return nil, nil + } + + return store.LabelNames(ctx, req) +} + +// LabelValues implements the Storegateway proto service. +// func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { +// spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") +// defer spanLog.Span.Finish() + +// userID := getUserIDFromGRPCContext(spanCtx) +// if userID == "" { +// return nil, fmt.Errorf("no userID") +// } + +// store := u.getStore(userID) +// if store == nil { +// return nil, nil +// } + +// return store.LabelValues(ctx, req) +// } + // scanUsers in the bucket and return the list of found users. If an error occurs while // iterating the bucket, it may return both an error and a subset of the users in the bucket. func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 8fa8cf01b79..b14100a468d 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -319,6 +319,16 @@ func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.Sto return g.stores.Series(req, srv) } +// LabelNames implements the Storegateway proto service. +func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return g.stores.LabelNames(ctx, req) +} + +// LabelValues implements the Storegateway proto service. +// func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { +// return g.stores.LabelValues(ctx, req) +// } + func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens) { // When we initialize the store-gateway instance in the ring we want to start from // a clean situation, so whatever is the state we set it JOINING, while we keep existing diff --git a/pkg/storegateway/storegatewaypb/gateway.pb.go b/pkg/storegateway/storegatewaypb/gateway.pb.go index 95c4c897f0a..c237a96023e 100644 --- a/pkg/storegateway/storegatewaypb/gateway.pb.go +++ b/pkg/storegateway/storegatewaypb/gateway.pb.go @@ -28,20 +28,22 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package func init() { proto.RegisterFile("gateway.proto", fileDescriptor_f1a937782ebbded5) } var fileDescriptor_f1a937782ebbded5 = []byte{ - // 204 bytes of a gzipped FileDescriptorProto + // 232 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0x4f, 0x2c, 0x49, 0x2d, 0x4f, 0xac, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x72, 0x0b, 0x92, 0xa4, 0xcc, 0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0x4b, 0x32, 0x12, 0xf3, 0xf2, 0x8b, 0x75, 0x33, 0xf3, 0xa1, 0x2c, 0xfd, 0x82, 0xec, 0x74, 0xfd, 0xe2, 0x92, 0xfc, 0xa2, - 0x54, 0x08, 0x59, 0x90, 0xa4, 0x5f, 0x54, 0x90, 0x0c, 0x31, 0xc3, 0xc8, 0x93, 0x8b, 0x27, 0x18, - 0x24, 0xe8, 0x0e, 0x31, 0x4a, 0xc8, 0x92, 0x8b, 0x2d, 0x38, 0xb5, 0x28, 0x33, 0xb5, 0x58, 0x48, - 0x54, 0x0f, 0xa2, 0x5d, 0x0f, 0xc2, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x91, 0x12, 0x43, - 0x17, 0x2e, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x35, 0x60, 0x74, 0x72, 0xb9, 0xf0, 0x50, 0x8e, 0xe1, - 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, 0x78, 0x24, 0xc7, - 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0xbe, 0x78, 0x24, - 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, 0x78, - 0x2c, 0xc7, 0x10, 0xc5, 0x07, 0x76, 0x13, 0xdc, 0x27, 0x49, 0x6c, 0x60, 0x77, 0x19, 0x03, 0x02, - 0x00, 0x00, 0xff, 0xff, 0xc5, 0x38, 0xd0, 0xf6, 0xec, 0x00, 0x00, 0x00, + 0x54, 0x08, 0x59, 0x90, 0xa4, 0x5f, 0x54, 0x90, 0x0c, 0x31, 0xc3, 0xa8, 0x8f, 0x91, 0x8b, 0x27, + 0x18, 0x24, 0xea, 0x0e, 0x31, 0x4b, 0xc8, 0x92, 0x8b, 0x2d, 0x38, 0xb5, 0x28, 0x33, 0xb5, 0x58, + 0x48, 0x54, 0x0f, 0xa2, 0x5f, 0x0f, 0xc2, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x91, 0x12, + 0x43, 0x17, 0x2e, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x35, 0x60, 0x14, 0x72, 0xe6, 0xe2, 0xf2, 0x49, + 0x4c, 0x4a, 0xcd, 0xf1, 0x4b, 0xcc, 0x4d, 0x2d, 0x16, 0x92, 0x84, 0xa9, 0x43, 0x88, 0xc1, 0x8c, + 0x90, 0xc2, 0x26, 0x05, 0x31, 0xc6, 0xc9, 0xe5, 0xc2, 0x43, 0x39, 0x86, 0x1b, 0x0f, 0xe5, 0x18, + 0x3e, 0x3c, 0x94, 0x63, 0x6c, 0x78, 0x24, 0xc7, 0xb8, 0xe2, 0x91, 0x1c, 0xe3, 0x89, 0x47, 0x72, + 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0xf8, 0xe2, 0x91, 0x1c, 0xc3, 0x87, 0x47, + 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x14, + 0x1f, 0xd8, 0x67, 0xf0, 0xf0, 0x48, 0x62, 0x03, 0xfb, 0xce, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, + 0x98, 0x48, 0x33, 0x6b, 0x32, 0x01, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -64,6 +66,8 @@ type StoreGatewayClient interface { // // Series are sorted. Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (StoreGateway_SeriesClient, error) + // LabelNames returns all label names that is available. + LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) } type storeGatewayClient struct { @@ -106,6 +110,15 @@ func (x *storeGatewaySeriesClient) Recv() (*storepb.SeriesResponse, error) { return m, nil } +func (c *storeGatewayClient) LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + out := new(storepb.LabelNamesResponse) + err := c.cc.Invoke(ctx, "/gatewaypb.StoreGateway/LabelNames", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // StoreGatewayServer is the server API for StoreGateway service. type StoreGatewayServer interface { // Series streams each Series for given label matchers and time range. @@ -116,6 +129,8 @@ type StoreGatewayServer interface { // // Series are sorted. Series(*storepb.SeriesRequest, StoreGateway_SeriesServer) error + // LabelNames returns all label names that is available. + LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) } // UnimplementedStoreGatewayServer can be embedded to have forward compatible implementations. @@ -125,6 +140,9 @@ type UnimplementedStoreGatewayServer struct { func (*UnimplementedStoreGatewayServer) Series(req *storepb.SeriesRequest, srv StoreGateway_SeriesServer) error { return status.Errorf(codes.Unimplemented, "method Series not implemented") } +func (*UnimplementedStoreGatewayServer) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LabelNames not implemented") +} func RegisterStoreGatewayServer(s *grpc.Server, srv StoreGatewayServer) { s.RegisterService(&_StoreGateway_serviceDesc, srv) @@ -151,10 +169,33 @@ func (x *storeGatewaySeriesServer) Send(m *storepb.SeriesResponse) error { return x.ServerStream.SendMsg(m) } +func _StoreGateway_LabelNames_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(storepb.LabelNamesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreGatewayServer).LabelNames(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gatewaypb.StoreGateway/LabelNames", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreGatewayServer).LabelNames(ctx, req.(*storepb.LabelNamesRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _StoreGateway_serviceDesc = grpc.ServiceDesc{ ServiceName: "gatewaypb.StoreGateway", HandlerType: (*StoreGatewayServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "LabelNames", + Handler: _StoreGateway_LabelNames_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "Series", diff --git a/pkg/storegateway/storegatewaypb/gateway.proto b/pkg/storegateway/storegatewaypb/gateway.proto index fdde78fe87e..10244ab1f02 100644 --- a/pkg/storegateway/storegatewaypb/gateway.proto +++ b/pkg/storegateway/storegatewaypb/gateway.proto @@ -14,4 +14,10 @@ service StoreGateway { // // Series are sorted. rpc Series(thanos.SeriesRequest) returns (stream thanos.SeriesResponse); + + // LabelNames returns all label names that is available. + rpc LabelNames(thanos.LabelNamesRequest) returns (thanos.LabelNamesResponse); + + // LabelValues returns all label values for given label name. + // rpc LabelValues(thanos.LabelValuesRequest) returns (thanos.LabelValuesResponse); } From b39fb1349922e84c3fb4ee08433296c8b3373ce4 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 20 Nov 2020 14:02:19 +0100 Subject: [PATCH 2/5] Query Label Values from block store Signed-off-by: Goutham Veeramachaneni --- CHANGELOG.md | 2 +- docs/api/_index.md | 4 +- integration/querier_test.go | 6 +- pkg/querier/blocks_store_queryable.go | 120 +++++++++++++++++- pkg/querier/blocks_store_queryable_test.go | 6 +- pkg/querier/querier.go | 47 ++++++- pkg/querier/store_gateway_client_test.go | 6 +- pkg/storegateway/bucket_stores.go | 32 ++--- pkg/storegateway/gateway.go | 6 +- pkg/storegateway/storegatewaypb/gateway.pb.go | 64 ++++++++-- pkg/storegateway/storegatewaypb/gateway.proto | 2 +- 11 files changed, 245 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58323836fdf..054b58ef504 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * [ENHANCEMENT] Added `cortex_alertmanager_config_hash` metric to expose hash of Alertmanager Config loaded per user. #3388 * [ENHANCEMENT] Query-Frontend / Query-Scheduler: New component called "Query-Scheduler" has been introduced. Query-Scheduler is simply a queue of requests, moved outside of Query-Frontend. This allows Query-Frontend to be scaled separately from number of queues. To make Query-Frontend and Querier use Query-Scheduler, they need to be started with `-frontend.scheduler-address` and `-querier.scheduler-address` options respectively. #3374 #3471 * [ENHANCEMENT] Query-frontend / Querier / Ruler: added `-querier.max-query-lookback` to limit how long back data (series and metadata) can be queried. This setting can be overridden on a per-tenant basis and is enforced in the query-frontend, querier and ruler. #3452 #3458 -* [ENHANCEMENT] Querier: added `-querier.query-store-for-labels-enabled` to query store for series API. Only works with blocks storage engine. #3461 +* [ENHANCEMENT] Querier: added `-querier.query-store-for-labels-enabled` to query store for label names, label values and series APIs. Only works with blocks storage engine. #3461 #3520 * [ENHANCEMENT] Ingester: exposed `-blocks-storage.tsdb.wal-segment-size-bytes` config option to customise the TSDB WAL segment max size. #3476 * [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483 * [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483 diff --git a/docs/api/_index.md b/docs/api/_index.md index 9ce6ba83ee4..78719f77027 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -294,7 +294,7 @@ GET,POST /api/v1/labels GET,POST /api/v1/labels ``` -Get label names of ingested series. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label names from in-memory data stored in the ingesters. +Get label names of ingested series. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label names from in-memory data stored in the ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set. _For more information, please check out the Prometheus [get label names](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) documentation._ @@ -309,7 +309,7 @@ GET /api/v1/label/{name}/values GET /api/v1/label/{name}/values ``` -Get label values for a given label name. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label values from in-memory data stored in the ingesters. +Get label values for a given label name. Differently than Prometheus and due to scalability and performances reasons, Cortex currently ignores the `start` and `end` request parameters and always fetches the label values from in-memory data stored in the ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set. _For more information, please check out the Prometheus [get label values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) documentation._ diff --git a/integration/querier_test.go b/integration/querier_test.go index e7c9cf846f7..f177264a0d1 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -536,7 +536,7 @@ func testMetadataQueriesWithBlocksStorage( labelValuesTests: []labelValuesTest{ { label: labels.MetricName, - resp: []string{lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, + resp: []string{lastSeriesInStorageName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, }, }, labelNames: []string{labels.MetricName, lastSeriesInStorageName, lastSeriesInIngesterBlocksName, firstSeriesInIngesterHeadName}, @@ -563,7 +563,7 @@ func testMetadataQueriesWithBlocksStorage( labelValuesTests: []labelValuesTest{ { label: labels.MetricName, - resp: []string{firstSeriesInIngesterHeadName}, + resp: []string{lastSeriesInStorageName, firstSeriesInIngesterHeadName}, }, }, labelNames: []string{labels.MetricName, lastSeriesInStorageName, firstSeriesInIngesterHeadName}, @@ -590,7 +590,7 @@ func testMetadataQueriesWithBlocksStorage( for _, val := range lvt.resp { exp = append(exp, model.LabelValue(val)) } - require.ElementsMatch(t, exp, labelsRes) + require.Equal(t, exp, labelsRes) } labelNames, err := c.LabelNames(tc.from, tc.to) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 24ca0c90c15..3e78301d24a 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sort" "strings" "sync" "time" @@ -293,11 +294,6 @@ func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. return q.selectSorted(sp, matchers...) } -func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { - // Cortex doesn't use this. It will ask ingesters for metadata. - return nil, nil, errors.New("not implemented") -} - func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelNames") defer spanLog.Span.Finish() @@ -333,6 +329,41 @@ func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { return strutil.MergeSlices(resNameSets...), resWarnings, nil } +func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelValues") + defer spanLog.Span.Finish() + + minT, maxT := q.minT, q.maxT + + var ( + resValueSets = [][]string{} + resWarnings = storage.Warnings(nil) + + resultMtx sync.Mutex + ) + + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { + valueSets, warnings, queriedBlocks, err := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT) + if err != nil { + return nil, err + } + + resultMtx.Lock() + resValueSets = append(resValueSets, valueSets...) + resWarnings = append(resWarnings, warnings...) + resultMtx.Unlock() + + return queriedBlocks, nil + } + + err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(resValueSets...), resWarnings, nil +} + func (q *blocksStoreQuerier) Close() error { return nil } @@ -690,6 +721,85 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( return nameSets, warnings, queriedBlocks, nil } +func (q *blocksStoreQuerier) fetchLabelValuesFromStore( + ctx context.Context, + name string, + clients map[BlocksStoreClient][]ulid.ULID, + minT int64, + maxT int64, +) ([][]string, storage.Warnings, []ulid.ULID, error) { + var ( + reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) + g, gCtx = errgroup.WithContext(reqCtx) + mtx = sync.Mutex{} + nameSets = [][]string{} + warnings = storage.Warnings(nil) + queriedBlocks = []ulid.ULID(nil) + spanLog = spanlogger.FromContext(ctx) + ) + + // Concurrently fetch series from all clients. + for c, blockIDs := range clients { + // Change variables scope since it will be used in a goroutine. + c := c + blockIDs := blockIDs + + g.Go(func() error { + req, err := createLabelValuesRequest(minT, maxT, name, blockIDs) + if err != nil { + return errors.Wrapf(err, "failed to create label names request") + } + + valuesResp, err := c.LabelValues(gCtx, req) + if err != nil { + return errors.Wrapf(err, "failed to fetch series from %s", c) + } + + myQueriedBlocks := []ulid.ULID(nil) + if valuesResp.Hints != nil { + hints := hintspb.LabelValuesResponseHints{} + if err := types.UnmarshalAny(valuesResp.Hints, &hints); err != nil { + return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c) + } + + ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) + if err != nil { + return errors.Wrapf(err, "failed to parse queried block IDs from received hints") + } + + myQueriedBlocks = ids + } + + level.Debug(spanLog).Log("msg", "received label names from store-gateway", + "instance", c, + "num labels", len(valuesResp.Values), + "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + + // Values returned need not be sorted, but we need them to be sorted so we can merge. + sort.Strings(valuesResp.Values) + + // Store the result. + mtx.Lock() + nameSets = append(nameSets, valuesResp.Values) + for _, w := range valuesResp.Warnings { + warnings = append(warnings, errors.New(w)) + } + queriedBlocks = append(queriedBlocks, myQueriedBlocks...) + mtx.Unlock() + + return nil + }) + } + + // Wait until all client requests complete. + if err := g.Wait(); err != nil { + return nil, nil, nil, err + } + + return nameSets, warnings, queriedBlocks, nil +} + func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) { // Selectively query only specific blocks. hints := &hintspb.SeriesRequestHints{ diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 87bf3e24d6d..bc92345c671 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -796,9 +796,9 @@ func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNames return nil, nil } -// func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { -// return nil, nil -// } +func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + return nil, nil +} func (m *storeGatewayClientMock) RemoteAddress() string { return m.remoteAddr diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 669b0991c3e..4f74863b41e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "sort" "strings" "sync" "time" @@ -376,7 +377,51 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat // LabelsValue implements storage.Querier. func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { - return q.metadataQuerier.LabelValues(name) + if !q.queryStoreForLabels { + return q.metadataQuerier.LabelValues(name) + } + + if len(q.queriers) == 1 { + return q.queriers[0].LabelValues(name) + } + + // Using an errgroup here instead of channels, etc because this + // is a better model imo and we should move to this everywhere. + var ( + g, _ = errgroup.WithContext(q.ctx) + sets = [][]string{} + warnings = storage.Warnings(nil) + + resMtx sync.Mutex + ) + + for _, querier := range q.queriers { + // Need to reassign as the original variable will change and can't be relied on in a goroutine. + querier := querier + g.Go(func() error { + myValues, myWarnings, err := querier.LabelValues(name) + if err != nil { + return err + } + + // We need values to be sorted we can merge them. + sort.Strings(myValues) + + resMtx.Lock() + sets = append(sets, myValues) + warnings = append(warnings, myWarnings...) + resMtx.Unlock() + + return nil + }) + } + + err := g.Wait() + if err != nil { + return nil, nil, err + } + + return strutil.MergeSlices(sets...), warnings, nil } func (q querier) LabelNames() ([]string, storage.Warnings, error) { diff --git a/pkg/querier/store_gateway_client_test.go b/pkg/querier/store_gateway_client_test.go index f6f9bfd908b..370e121effe 100644 --- a/pkg/querier/store_gateway_client_test.go +++ b/pkg/querier/store_gateway_client_test.go @@ -79,6 +79,6 @@ func (m *mockStoreGatewayServer) LabelNames(context.Context, *storepb.LabelNames return nil, nil } -// func (m *mockStoreGatewayServer) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { -// return nil, nil -// } +func (m *mockStoreGatewayServer) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return nil, nil +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index d291eb52017..c816d3071e2 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -266,22 +266,22 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe } // LabelValues implements the Storegateway proto service. -// func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { -// spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") -// defer spanLog.Span.Finish() - -// userID := getUserIDFromGRPCContext(spanCtx) -// if userID == "" { -// return nil, fmt.Errorf("no userID") -// } - -// store := u.getStore(userID) -// if store == nil { -// return nil, nil -// } - -// return store.LabelValues(ctx, req) -// } +func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") + defer spanLog.Span.Finish() + + userID := getUserIDFromGRPCContext(spanCtx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + store := u.getStore(userID) + if store == nil { + return nil, nil + } + + return store.LabelValues(ctx, req) +} // scanUsers in the bucket and return the list of found users. If an error occurs while // iterating the bucket, it may return both an error and a subset of the users in the bucket. diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index b14100a468d..fe6bff4f6c9 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -325,9 +325,9 @@ func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRe } // LabelValues implements the Storegateway proto service. -// func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { -// return g.stores.LabelValues(ctx, req) -// } +func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return g.stores.LabelValues(ctx, req) +} func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens) { // When we initialize the store-gateway instance in the ring we want to start from diff --git a/pkg/storegateway/storegatewaypb/gateway.pb.go b/pkg/storegateway/storegatewaypb/gateway.pb.go index c237a96023e..fa5913faf44 100644 --- a/pkg/storegateway/storegatewaypb/gateway.pb.go +++ b/pkg/storegateway/storegatewaypb/gateway.pb.go @@ -28,22 +28,24 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package func init() { proto.RegisterFile("gateway.proto", fileDescriptor_f1a937782ebbded5) } var fileDescriptor_f1a937782ebbded5 = []byte{ - // 232 bytes of a gzipped FileDescriptorProto + // 257 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0x4f, 0x2c, 0x49, 0x2d, 0x4f, 0xac, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x72, 0x0b, 0x92, 0xa4, 0xcc, 0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0x4b, 0x32, 0x12, 0xf3, 0xf2, 0x8b, 0x75, 0x33, 0xf3, 0xa1, 0x2c, 0xfd, 0x82, 0xec, 0x74, 0xfd, 0xe2, 0x92, 0xfc, 0xa2, - 0x54, 0x08, 0x59, 0x90, 0xa4, 0x5f, 0x54, 0x90, 0x0c, 0x31, 0xc3, 0xa8, 0x8f, 0x91, 0x8b, 0x27, - 0x18, 0x24, 0xea, 0x0e, 0x31, 0x4b, 0xc8, 0x92, 0x8b, 0x2d, 0x38, 0xb5, 0x28, 0x33, 0xb5, 0x58, - 0x48, 0x54, 0x0f, 0xa2, 0x5f, 0x0f, 0xc2, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x91, 0x12, - 0x43, 0x17, 0x2e, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x35, 0x60, 0x14, 0x72, 0xe6, 0xe2, 0xf2, 0x49, - 0x4c, 0x4a, 0xcd, 0xf1, 0x4b, 0xcc, 0x4d, 0x2d, 0x16, 0x92, 0x84, 0xa9, 0x43, 0x88, 0xc1, 0x8c, - 0x90, 0xc2, 0x26, 0x05, 0x31, 0xc6, 0xc9, 0xe5, 0xc2, 0x43, 0x39, 0x86, 0x1b, 0x0f, 0xe5, 0x18, - 0x3e, 0x3c, 0x94, 0x63, 0x6c, 0x78, 0x24, 0xc7, 0xb8, 0xe2, 0x91, 0x1c, 0xe3, 0x89, 0x47, 0x72, - 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0xf8, 0xe2, 0x91, 0x1c, 0xc3, 0x87, 0x47, - 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x14, - 0x1f, 0xd8, 0x67, 0xf0, 0xf0, 0x48, 0x62, 0x03, 0xfb, 0xce, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, - 0x98, 0x48, 0x33, 0x6b, 0x32, 0x01, 0x00, 0x00, + 0x54, 0x08, 0x59, 0x90, 0xa4, 0x5f, 0x54, 0x90, 0x0c, 0x31, 0xc3, 0xe8, 0x1a, 0x23, 0x17, 0x4f, + 0x30, 0x48, 0xd4, 0x1d, 0x62, 0x96, 0x90, 0x25, 0x17, 0x5b, 0x70, 0x6a, 0x51, 0x66, 0x6a, 0xb1, + 0x90, 0xa8, 0x1e, 0x44, 0xbf, 0x1e, 0x84, 0x1f, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x22, 0x25, + 0x86, 0x2e, 0x5c, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x6a, 0xc0, 0x28, 0xe4, 0xcc, 0xc5, 0xe5, 0x93, + 0x98, 0x94, 0x9a, 0xe3, 0x97, 0x98, 0x9b, 0x5a, 0x2c, 0x24, 0x09, 0x53, 0x87, 0x10, 0x83, 0x19, + 0x21, 0x85, 0x4d, 0x0a, 0x62, 0x8c, 0x90, 0x1b, 0x17, 0x37, 0x58, 0x34, 0x2c, 0x31, 0xa7, 0x34, + 0xb5, 0x58, 0x08, 0x55, 0x29, 0x44, 0x10, 0x66, 0x8c, 0x34, 0x56, 0x39, 0x88, 0x39, 0x4e, 0x2e, + 0x17, 0x1e, 0xca, 0x31, 0xdc, 0x78, 0x28, 0xc7, 0xf0, 0xe1, 0xa1, 0x1c, 0x63, 0xc3, 0x23, 0x39, + 0xc6, 0x15, 0x8f, 0xe4, 0x18, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, + 0x39, 0xc6, 0x17, 0x8f, 0xe4, 0x18, 0x3e, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, + 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xf8, 0xc0, 0x21, 0x04, 0x0f, 0xd7, 0x24, 0x36, + 0x70, 0x28, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x1b, 0xec, 0xe6, 0x0a, 0x7a, 0x01, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -68,6 +70,8 @@ type StoreGatewayClient interface { Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (StoreGateway_SeriesClient, error) // LabelNames returns all label names that is available. LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) + // LabelValues returns all label values for given label name. + LabelValues(ctx context.Context, in *storepb.LabelValuesRequest, opts ...grpc.CallOption) (*storepb.LabelValuesResponse, error) } type storeGatewayClient struct { @@ -119,6 +123,15 @@ func (c *storeGatewayClient) LabelNames(ctx context.Context, in *storepb.LabelNa return out, nil } +func (c *storeGatewayClient) LabelValues(ctx context.Context, in *storepb.LabelValuesRequest, opts ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + out := new(storepb.LabelValuesResponse) + err := c.cc.Invoke(ctx, "/gatewaypb.StoreGateway/LabelValues", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // StoreGatewayServer is the server API for StoreGateway service. type StoreGatewayServer interface { // Series streams each Series for given label matchers and time range. @@ -131,6 +144,8 @@ type StoreGatewayServer interface { Series(*storepb.SeriesRequest, StoreGateway_SeriesServer) error // LabelNames returns all label names that is available. LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) + // LabelValues returns all label values for given label name. + LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) } // UnimplementedStoreGatewayServer can be embedded to have forward compatible implementations. @@ -143,6 +158,9 @@ func (*UnimplementedStoreGatewayServer) Series(req *storepb.SeriesRequest, srv S func (*UnimplementedStoreGatewayServer) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method LabelNames not implemented") } +func (*UnimplementedStoreGatewayServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LabelValues not implemented") +} func RegisterStoreGatewayServer(s *grpc.Server, srv StoreGatewayServer) { s.RegisterService(&_StoreGateway_serviceDesc, srv) @@ -187,6 +205,24 @@ func _StoreGateway_LabelNames_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _StoreGateway_LabelValues_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(storepb.LabelValuesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreGatewayServer).LabelValues(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gatewaypb.StoreGateway/LabelValues", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreGatewayServer).LabelValues(ctx, req.(*storepb.LabelValuesRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _StoreGateway_serviceDesc = grpc.ServiceDesc{ ServiceName: "gatewaypb.StoreGateway", HandlerType: (*StoreGatewayServer)(nil), @@ -195,6 +231,10 @@ var _StoreGateway_serviceDesc = grpc.ServiceDesc{ MethodName: "LabelNames", Handler: _StoreGateway_LabelNames_Handler, }, + { + MethodName: "LabelValues", + Handler: _StoreGateway_LabelValues_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/pkg/storegateway/storegatewaypb/gateway.proto b/pkg/storegateway/storegatewaypb/gateway.proto index 10244ab1f02..14e65859c27 100644 --- a/pkg/storegateway/storegatewaypb/gateway.proto +++ b/pkg/storegateway/storegatewaypb/gateway.proto @@ -19,5 +19,5 @@ service StoreGateway { rpc LabelNames(thanos.LabelNamesRequest) returns (thanos.LabelNamesResponse); // LabelValues returns all label values for given label name. - // rpc LabelValues(thanos.LabelValuesRequest) returns (thanos.LabelValuesResponse); + rpc LabelValues(thanos.LabelValuesRequest) returns (thanos.LabelValuesResponse); } From 31589381b13cc466d89e1ab5fc9b185370f37ff7 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 24 Nov 2020 12:13:52 +0100 Subject: [PATCH 3/5] Address feedback and unit tests for label names Signed-off-by: Goutham Veeramachaneni --- pkg/querier/blocks_store_queryable.go | 20 +- pkg/querier/blocks_store_queryable_test.go | 423 +++++++++++++++++++-- pkg/querier/querier.go | 4 - pkg/storegateway/bucket_stores.go | 6 +- 4 files changed, 407 insertions(+), 46 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 3e78301d24a..70d17b75cf8 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -732,7 +732,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) mtx = sync.Mutex{} - nameSets = [][]string{} + valueSets = [][]string{} warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) @@ -747,7 +747,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( g.Go(func() error { req, err := createLabelValuesRequest(minT, maxT, name, blockIDs) if err != nil { - return errors.Wrapf(err, "failed to create label names request") + return errors.Wrapf(err, "failed to create label values request") } valuesResp, err := c.LabelValues(gCtx, req) @@ -759,7 +759,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( if valuesResp.Hints != nil { hints := hintspb.LabelValuesResponseHints{} if err := types.UnmarshalAny(valuesResp.Hints, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c) + return errors.Wrapf(err, "failed to unmarshal label values hints from %s", c) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) @@ -770,9 +770,9 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( myQueriedBlocks = ids } - level.Debug(spanLog).Log("msg", "received label names from store-gateway", + level.Debug(spanLog).Log("msg", "received label values from store-gateway", "instance", c, - "num labels", len(valuesResp.Values), + "num values", len(valuesResp.Values), "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) @@ -781,7 +781,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( // Store the result. mtx.Lock() - nameSets = append(nameSets, valuesResp.Values) + valueSets = append(valueSets, valuesResp.Values) for _, w := range valuesResp.Warnings { warnings = append(warnings, errors.New(w)) } @@ -797,7 +797,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( return nil, nil, nil, err } - return nameSets, warnings, queriedBlocks, nil + return valueSets, warnings, queriedBlocks, nil } func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) { @@ -814,7 +814,7 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip anyHints, err := types.MarshalAny(hints) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal request hints") + return nil, errors.Wrapf(err, "failed to marshal series request hints") } return &storepb.SeriesRequest{ @@ -850,7 +850,7 @@ func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID) (*storepb.L anyHints, err := types.MarshalAny(hints) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal request hints") + return nil, errors.Wrapf(err, "failed to marshal label names request hints") } req.Hints = anyHints @@ -882,7 +882,7 @@ func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.UL anyHints, err := types.MarshalAny(hints) if err != nil { - return nil, errors.Wrapf(err, "failed to marshal request hints") + return nil, errors.Wrapf(err, "failed to marshal label values request hints") } req.Hints = anyHints diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index bc92345c671..8e402da4b49 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sort" "strings" "testing" "time" @@ -98,7 +99,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block1, block2), @@ -123,7 +124,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 3), @@ -154,11 +155,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block2), }}: {block2}, @@ -182,11 +183,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block2), @@ -211,17 +212,17 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 1), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block2), }}: {block2}, - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), mockHintsResponse(block3), @@ -280,7 +281,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block1), @@ -302,11 +303,11 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block1), }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), mockHintsResponse(block2), }}: {block2}, @@ -327,25 +328,25 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockHintsResponse(block1), }}: {block1, block3}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2), mockHintsResponse(block2), }}: {block2, block4}, }, // Second attempt returns 1 missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block3), }}: {block3, block4}, }, // Third attempt returns the last missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), mockHintsResponse(block4), }}: {block4}, @@ -402,7 +403,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block1, block2), @@ -427,7 +428,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block1, block2), @@ -447,25 +448,25 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), mockHintsResponse(block1), }}: {block1, block3}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2), mockHintsResponse(block2), }}: {block2, block4}, }, // Second attempt returns 1 missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), mockHintsResponse(block3), }}: {block3, block4}, }, // Third attempt returns the last missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedResponses: []*storepb.SeriesResponse{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{ mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), mockHintsResponse(block4), }}: {block4}, @@ -544,6 +545,337 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { } } +func TestBlocksStoreQuerier_Labels(t *testing.T) { + const ( + metricName = "test_metric" + minT = int64(10) + maxT = int64(20) + ) + + var ( + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + series1 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": "1", + }) + series2 = labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_2", + "series2": "1", + }) + ) + + type valueResult struct { + t int64 + v float64 + } + + type seriesResult struct { + lbls labels.Labels + values []valueResult + } + + tests := map[string]struct { + finderResult []*BlockMeta + finderErr error + storeSetResponses []interface{} + expectedLabelNames []string + expectedLabelValues []string // For __name__ + expectedErr string + expectedMetrics string + }{ + "no block in the storage matching the query time range": { + finderResult: nil, + expectedErr: "", + }, + "error while finding blocks matching the query time range": { + finderErr: errors.New("unable to find blocks"), + expectedErr: "unable to find blocks", + }, + "error while getting clients to query the store-gateway": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + errors.New("no client found"), + }, + expectedErr: "no client found", + }, + "a single store-gateway instance holds the required blocks": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block1, block2), + }}: {block1, block2}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + }, + "multiple store-gateway instances holds the required blocks without overlapping series": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }}: {block2}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + }, + "multiple store-gateway instances holds the required blocks with overlapping series (single returned series)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }}: {block2}, + }, + }, + expectedLabelNames: namesFromSeries(series1), + }, + "multiple store-gateway instances holds the required blocks with overlapping series (multiple returned series)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + // Block1 has series1 and series2 + // Block2 has only series1 + // Block3 has only series2 + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }}: {block2}, + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block3), + }}: {block3}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedMetrics: ` + # HELP cortex_querier_storegateway_instances_hit_per_query Number of store-gateway instances hit for a single query. + # TYPE cortex_querier_storegateway_instances_hit_per_query histogram + cortex_querier_storegateway_instances_hit_per_query_bucket{le="0"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="1"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="2"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="3"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="4"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="5"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="6"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="7"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="8"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="9"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="10"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_instances_hit_per_query_sum 3 + cortex_querier_storegateway_instances_hit_per_query_count 1 + + # HELP cortex_querier_storegateway_refetches_per_query Number of re-fetches attempted while querying store-gateway instances due to missing blocks. + # TYPE cortex_querier_storegateway_refetches_per_query histogram + cortex_querier_storegateway_refetches_per_query_bucket{le="0"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="1"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="2"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_refetches_per_query_sum 0 + cortex_querier_storegateway_refetches_per_query_count 1 + `, + }, + "a single store-gateway instance has some missing blocks (consistency check failed)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }}: {block1}, + }, + // Second attempt returns an error because there are no other store-gateways left. + errors.New("no store-gateway remaining after exclude"), + }, + expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s", block2.String()), + }, + "multiple store-gateway instances have some missing blocks (consistency check failed)": { + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }}: {block2}, + }, + // Second attempt returns an error because there are no other store-gateways left. + errors.New("no store-gateway remaining after exclude"), + }, + expectedErr: fmt.Sprintf("consistency check failed because some blocks were not queried: %s %s", block3.String(), block4.String()), + }, + "multiple store-gateway instances have some missing blocks but queried from a replica during subsequent attempts": { + // Block1 has series1 + // Block2 has series2 + // Block3 has series1 and series2 + // Block4 has no series (poor lonely block) + finderResult: []*BlockMeta{ + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block3}}}, + {Meta: metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block4}}}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }}: {block1, block3}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }}: {block2, block4}, + }, + // Second attempt returns 1 missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block3), + }}: {block3, block4}, + }, + // Third attempt returns the last missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: []string{}, + Warnings: []string{}, + Hints: mockNamesHints(block4), + }}: {block4}, + }, + }, + expectedLabelNames: namesFromSeries(series1, series2), + expectedMetrics: ` + # HELP cortex_querier_storegateway_instances_hit_per_query Number of store-gateway instances hit for a single query. + # TYPE cortex_querier_storegateway_instances_hit_per_query histogram + cortex_querier_storegateway_instances_hit_per_query_bucket{le="0"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="1"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="2"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="3"} 0 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="4"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="5"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="6"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="7"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="8"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="9"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="10"} 1 + cortex_querier_storegateway_instances_hit_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_instances_hit_per_query_sum 4 + cortex_querier_storegateway_instances_hit_per_query_count 1 + + # HELP cortex_querier_storegateway_refetches_per_query Number of re-fetches attempted while querying store-gateway instances due to missing blocks. + # TYPE cortex_querier_storegateway_refetches_per_query histogram + cortex_querier_storegateway_refetches_per_query_bucket{le="0"} 0 + cortex_querier_storegateway_refetches_per_query_bucket{le="1"} 0 + cortex_querier_storegateway_refetches_per_query_bucket{le="2"} 1 + cortex_querier_storegateway_refetches_per_query_bucket{le="+Inf"} 1 + cortex_querier_storegateway_refetches_per_query_sum 2 + cortex_querier_storegateway_refetches_per_query_count 1 + `, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} + finder := &blocksFinderMock{} + finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr) + + q := &blocksStoreQuerier{ + ctx: ctx, + minT: minT, + maxT: maxT, + userID: "user-1", + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(reg), + limits: &blocksStoreLimitsMock{}, + } + + names, warnings, err := q.LabelNames() + if testData.expectedErr != "" { + require.Equal(t, testData.expectedErr, err.Error()) + return + } + + require.NoError(t, err) + // TODO: Don't use Len. + assert.Len(t, warnings, 0) + require.Equal(t, testData.expectedLabelNames, names) + + // Assert on metrics (optional, only for test cases defining it). + if testData.expectedMetrics != "" { + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) + } + }) + } +} + func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) { now := time.Now() @@ -656,7 +988,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { }, map[ulid.ULID]*metadata.DeletionMark(nil), error(nil)) // Mock the store to simulate each block is queried from a different store-gateway. - gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedResponses: []*storepb.SeriesResponse{ + gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ { Result: &storepb.SeriesResponse_Series{ Series: &storepb.Series{ @@ -679,7 +1011,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { mockHintsResponse(block1), }} - gateway2 := &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedResponses: []*storepb.SeriesResponse{ + gateway2 := &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ { Result: &storepb.SeriesResponse_Series{ Series: &storepb.Series{ @@ -780,24 +1112,26 @@ func (m *blocksFinderMock) GetBlocks(userID string, minT, maxT int64) ([]*BlockM } type storeGatewayClientMock struct { - remoteAddr string - mockedResponses []*storepb.SeriesResponse + remoteAddr string + mockedSeriesResponses []*storepb.SeriesResponse + mockedLabelNamesResponse *storepb.LabelNamesResponse + mockedLabelValuesResponse *storepb.LabelValuesResponse } func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) { seriesClient := &storeGatewaySeriesClientMock{ - mockedResponses: m.mockedResponses, + mockedResponses: m.mockedSeriesResponses, } return seriesClient, nil } func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNamesRequest, ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { - return nil, nil + return m.mockedLabelNamesResponse, nil } func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { - return nil, nil + return m.mockedLabelValuesResponse, nil } func (m *storeGatewayClientMock) RemoteAddress() string { @@ -875,3 +1209,34 @@ func mockHintsResponse(ids ...ulid.ULID) *storepb.SeriesResponse { }, } } + +func mockNamesHints(ids ...ulid.ULID) *types.Any { + hints := &hintspb.LabelNamesResponseHints{} + for _, id := range ids { + hints.AddQueriedBlock(id) + } + + any, err := types.MarshalAny(hints) + if err != nil { + panic(err) + } + + return any +} + +func namesFromSeries(series ...labels.Labels) []string { + namesMap := map[string]struct{}{} + for _, s := range series { + for _, l := range s { + namesMap[l.Name] = struct{}{} + } + } + + names := []string{} + for name := range namesMap { + names = append(names, name) + } + + sort.Strings(names) + return names +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 4f74863b41e..2d7839712ad 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -385,8 +385,6 @@ func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { return q.queriers[0].LabelValues(name) } - // Using an errgroup here instead of channels, etc because this - // is a better model imo and we should move to this everywhere. var ( g, _ = errgroup.WithContext(q.ctx) sets = [][]string{} @@ -433,8 +431,6 @@ func (q querier) LabelNames() ([]string, storage.Warnings, error) { return q.queriers[0].LabelNames() } - // Using an errgroup here instead of channels, etc because this - // is a better model imo and we should move to this everywhere. var ( g, _ = errgroup.WithContext(q.ctx) sets = [][]string{} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index c816d3071e2..e98d442d1a0 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -259,7 +259,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe store := u.getStore(userID) if store == nil { - return nil, nil + return &storepb.LabelNamesResponse{}, nil } return store.LabelNames(ctx, req) @@ -267,7 +267,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe // LabelValues implements the Storegateway proto service. func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") + spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues") defer spanLog.Span.Finish() userID := getUserIDFromGRPCContext(spanCtx) @@ -277,7 +277,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues store := u.getStore(userID) if store == nil { - return nil, nil + return &storepb.LabelValuesResponse{}, nil } return store.LabelValues(ctx, req) From 25663ce5424c82bd09b890eeb9acc7afea9d314f Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 24 Nov 2020 13:48:26 +0100 Subject: [PATCH 4/5] Add unit test for LabelValues Signed-off-by: Goutham Veeramachaneni --- pkg/querier/blocks_store_queryable_test.go | 412 +++++++++++++++------ 1 file changed, 291 insertions(+), 121 deletions(-) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 8e402da4b49..52587ddf9ca 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -567,16 +567,6 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { }) ) - type valueResult struct { - t int64 - v float64 - } - - type seriesResult struct { - lbls labels.Labels - values []valueResult - } - tests := map[string]struct { finderResult []*BlockMeta finderErr error @@ -611,14 +601,23 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1, series2), - Warnings: []string{}, - Hints: mockNamesHints(block1, block2), - }}: {block1, block2}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block1, block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2), + Warnings: []string{}, + Hints: mockValuesHints(block1, block2), + }, + }: {block1, block2}, }, }, - expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), }, "multiple store-gateway instances holds the required blocks without overlapping series": { finderResult: []*BlockMeta{ @@ -627,19 +626,36 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block1), - }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series2), - Warnings: []string{}, - Hints: mockNamesHints(block2), - }}: {block2}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, }, }, - expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), }, "multiple store-gateway instances holds the required blocks with overlapping series (single returned series)": { finderResult: []*BlockMeta{ @@ -648,19 +664,36 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { }, storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block1), - }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block2), - }}: {block2}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, }, }, - expectedLabelNames: namesFromSeries(series1), + expectedLabelNames: namesFromSeries(series1), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1), }, "multiple store-gateway instances holds the required blocks with overlapping series (multiple returned series)": { finderResult: []*BlockMeta{ @@ -672,24 +705,49 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { // Block3 has only series2 storeSetResponses: []interface{}{ map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1, series2), - Warnings: []string{}, - Hints: mockNamesHints(block1), - }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block2), - }}: {block2}, - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series2), - Warnings: []string{}, - Hints: mockNamesHints(block3), - }}: {block3}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, + &storeGatewayClientMock{ + remoteAddr: "3.3.3.3", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block3), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block3), + }, + }: {block3}, }, }, - expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), expectedMetrics: ` # HELP cortex_querier_storegateway_instances_hit_per_query Number of store-gateway instances hit for a single query. # TYPE cortex_querier_storegateway_instances_hit_per_query histogram @@ -726,11 +784,19 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block1), - }}: {block1}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, }, // Second attempt returns an error because there are no other store-gateways left. errors.New("no store-gateway remaining after exclude"), @@ -747,16 +813,32 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block1), - }}: {block1}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series2), - Warnings: []string{}, - Hints: mockNamesHints(block2), - }}: {block2}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2}, }, // Second attempt returns an error because there are no other store-gateways left. errors.New("no store-gateway remaining after exclude"), @@ -777,35 +859,68 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { storeSetResponses: []interface{}{ // First attempt returns a client whose response does not include all expected blocks. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1), - Warnings: []string{}, - Hints: mockNamesHints(block1), - }}: {block1, block3}, - &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series2), - Warnings: []string{}, - Hints: mockNamesHints(block2), - }}: {block2, block4}, + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1), + Warnings: []string{}, + Hints: mockNamesHints(block1), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1), + Warnings: []string{}, + Hints: mockValuesHints(block1), + }, + }: {block1, block3}, + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series2), + Warnings: []string{}, + Hints: mockNamesHints(block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series2), + Warnings: []string{}, + Hints: mockValuesHints(block2), + }, + }: {block2, block4}, }, // Second attempt returns 1 missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(series1, series2), - Warnings: []string{}, - Hints: mockNamesHints(block3), - }}: {block3, block4}, + &storeGatewayClientMock{ + remoteAddr: "3.3.3.3", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(series1, series2), + Warnings: []string{}, + Hints: mockNamesHints(block3), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, series1, series2), + Warnings: []string{}, + Hints: mockValuesHints(block3), + }, + }: {block3, block4}, }, // Third attempt returns the last missing block. map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: []string{}, - Warnings: []string{}, - Hints: mockNamesHints(block4), - }}: {block4}, + &storeGatewayClientMock{ + remoteAddr: "4.4.4.4", + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: []string{}, + Warnings: []string{}, + Hints: mockNamesHints(block4), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: []string{}, + Warnings: []string{}, + Hints: mockValuesHints(block4), + }, + }: {block4}, }, }, - expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelNames: namesFromSeries(series1, series2), + expectedLabelValues: valuesFromSeries(labels.MetricName, series1, series2), expectedMetrics: ` # HELP cortex_querier_storegateway_instances_hit_per_query Number of store-gateway instances hit for a single query. # TYPE cortex_querier_storegateway_instances_hit_per_query histogram @@ -838,39 +953,61 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - ctx := context.Background() - reg := prometheus.NewPedanticRegistry() - stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} - finder := &blocksFinderMock{} - finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr) - - q := &blocksStoreQuerier{ - ctx: ctx, - minT: minT, - maxT: maxT, - userID: "user-1", - finder: finder, - stores: stores, - consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), - logger: log.NewNopLogger(), - metrics: newBlocksStoreQueryableMetrics(reg), - limits: &blocksStoreLimitsMock{}, - } - - names, warnings, err := q.LabelNames() - if testData.expectedErr != "" { - require.Equal(t, testData.expectedErr, err.Error()) - return - } + // Splitting it because we need a new registry for names and values. + // And also the initial expectedErr checking needs to be done for both. + for _, testFunc := range []string{"LabelNames", "LabelValues"} { + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} + finder := &blocksFinderMock{} + finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr) + + q := &blocksStoreQuerier{ + ctx: ctx, + minT: minT, + maxT: maxT, + userID: "user-1", + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(reg), + limits: &blocksStoreLimitsMock{}, + } - require.NoError(t, err) - // TODO: Don't use Len. - assert.Len(t, warnings, 0) - require.Equal(t, testData.expectedLabelNames, names) + if testFunc == "LabelNames" { + names, warnings, err := q.LabelNames() + if testData.expectedErr != "" { + require.Equal(t, testData.expectedErr, err.Error()) + continue + } + + require.NoError(t, err) + require.Equal(t, 0, len(warnings)) + require.Equal(t, testData.expectedLabelNames, names) + + // Assert on metrics (optional, only for test cases defining it). + if testData.expectedMetrics != "" { + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) + } + } - // Assert on metrics (optional, only for test cases defining it). - if testData.expectedMetrics != "" { - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) + if testFunc == "LabelValues" { + values, warnings, err := q.LabelValues(labels.MetricName) + if testData.expectedErr != "" { + require.Equal(t, testData.expectedErr, err.Error()) + continue + } + + require.NoError(t, err) + require.Equal(t, 0, len(warnings)) + require.Equal(t, testData.expectedLabelValues, values) + + // Assert on metrics (optional, only for test cases defining it). + if testData.expectedMetrics != "" { + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) + } + } } }) } @@ -1224,6 +1361,20 @@ func mockNamesHints(ids ...ulid.ULID) *types.Any { return any } +func mockValuesHints(ids ...ulid.ULID) *types.Any { + hints := &hintspb.LabelValuesResponseHints{} + for _, id := range ids { + hints.AddQueriedBlock(id) + } + + any, err := types.MarshalAny(hints) + if err != nil { + panic(err) + } + + return any +} + func namesFromSeries(series ...labels.Labels) []string { namesMap := map[string]struct{}{} for _, s := range series { @@ -1240,3 +1391,22 @@ func namesFromSeries(series ...labels.Labels) []string { sort.Strings(names) return names } + +func valuesFromSeries(name string, series ...labels.Labels) []string { + valuesMap := map[string]struct{}{} + for _, s := range series { + for _, l := range s { + if l.Name == name { + valuesMap[l.Value] = struct{}{} + } + } + } + + values := []string{} + for name := range valuesMap { + values = append(values, name) + } + + sort.Strings(values) + return values +} From 9915d35f101a3798df096efac2b389b979d35415 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 25 Nov 2020 15:03:44 +0100 Subject: [PATCH 5/5] Address feedback Signed-off-by: Goutham Veeramachaneni --- pkg/distributor/distributor.go | 9 +++++--- pkg/querier/blocks_store_queryable.go | 33 ++++++++++----------------- pkg/querier/distributor_queryable.go | 5 ++-- pkg/querier/querier.go | 6 ++--- 4 files changed, 23 insertions(+), 30 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e42fdf5f428..c575851b26a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -672,6 +672,10 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode for v := range valueSet { values = append(values, v) } + + // We need the values returned to be sorted. + sort.Strings(values) + return values, nil } @@ -704,9 +708,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st for v := range valueSet { values = append(values, v) } - sort.Slice(values, func(i, j int) bool { - return values[i] < values[j] - }) + + sort.Strings(values) return values, nil } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 70d17b75cf8..93341dbd482 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -301,10 +301,9 @@ func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { minT, maxT := q.minT, q.maxT var ( + resMtx sync.Mutex resNameSets = [][]string{} resWarnings = storage.Warnings(nil) - - resultMtx sync.Mutex ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { @@ -313,10 +312,10 @@ func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, err } - resultMtx.Lock() + resMtx.Lock() resNameSets = append(resNameSets, nameSets...) resWarnings = append(resWarnings, warnings...) - resultMtx.Unlock() + resMtx.Unlock() return queriedBlocks, nil } @@ -566,7 +565,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( stream, err := c.Series(gCtx, req) if err != nil { - return errors.Wrapf(err, "failed to fetch series from %s", c) + return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } mySeries := []*storepb.Series(nil) @@ -585,7 +584,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( break } if err != nil { - return errors.Wrapf(err, "failed to receive series from %s", c) + return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) } // Response may either contain series, warning or hints. @@ -608,7 +607,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if h := resp.GetHints(); h != nil { hints := hintspb.SeriesResponseHints{} if err := types.UnmarshalAny(h, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal series hints from %s", c) + return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) @@ -621,7 +620,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } level.Debug(spanLog).Log("msg", "received series from store-gateway", - "instance", c, + "instance", c.RemoteAddress(), "num series", len(mySeries), "bytes series", countSeriesBytes(mySeries), "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), @@ -676,14 +675,14 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( namesResp, err := c.LabelNames(gCtx, req) if err != nil { - return errors.Wrapf(err, "failed to fetch series from %s", c) + return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } myQueriedBlocks := []ulid.ULID(nil) if namesResp.Hints != nil { hints := hintspb.LabelNamesResponseHints{} if err := types.UnmarshalAny(namesResp.Hints, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c) + return errors.Wrapf(err, "failed to unmarshal label names hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) @@ -752,14 +751,14 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( valuesResp, err := c.LabelValues(gCtx, req) if err != nil { - return errors.Wrapf(err, "failed to fetch series from %s", c) + return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) } myQueriedBlocks := []ulid.ULID(nil) if valuesResp.Hints != nil { hints := hintspb.LabelValuesResponseHints{} if err := types.UnmarshalAny(valuesResp.Hints, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal label values hints from %s", c) + return errors.Wrapf(err, "failed to unmarshal label values hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) @@ -771,7 +770,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( } level.Debug(spanLog).Log("msg", "received label values from store-gateway", - "instance", c, + "instance", c.RemoteAddress(), "num values", len(valuesResp.Values), "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) @@ -833,10 +832,6 @@ func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID) (*storepb.L End: maxT, } - if len(blockIDs) == 0 { - return req, nil - } - // Selectively query only specific blocks. hints := &hintspb.LabelNamesRequestHints{ BlockMatchers: []storepb.LabelMatcher{ @@ -865,10 +860,6 @@ func createLabelValuesRequest(minT, maxT int64, label string, blockIDs []ulid.UL Label: label, } - if len(blockIDs) == 0 { - return req, nil - } - // Selectively query only specific blocks. hints := &hintspb.LabelValuesRequestHints{ BlockMatchers: []storepb.LabelMatcher{ diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 4e08b02d21a..0f413bb415b 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -183,8 +183,9 @@ func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*label } func (q *distributorQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { - lv, err := q.distributor.LabelValuesForLabelName(q.ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name)) - return lv, nil, err + lvs, err := q.distributor.LabelValuesForLabelName(q.ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name)) + + return lvs, nil, err } func (q *distributorQuerier) LabelNames() ([]string, storage.Warnings, error) { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2d7839712ad..51f3923758b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,7 +5,6 @@ import ( "errors" "flag" "fmt" - "sort" "strings" "sync" "time" @@ -397,14 +396,12 @@ func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { // Need to reassign as the original variable will change and can't be relied on in a goroutine. querier := querier g.Go(func() error { + // NB: Values are sorted in Cortex already. myValues, myWarnings, err := querier.LabelValues(name) if err != nil { return err } - // We need values to be sorted we can merge them. - sort.Strings(myValues) - resMtx.Lock() sets = append(sets, myValues) warnings = append(warnings, myWarnings...) @@ -443,6 +440,7 @@ func (q querier) LabelNames() ([]string, storage.Warnings, error) { // Need to reassign as the original variable will change and can't be relied on in a goroutine. querier := querier g.Go(func() error { + // NB: Names are sorted in Cortex already. myNames, myWarnings, err := querier.LabelNames() if err != nil { return err