diff --git a/CHANGELOG.md b/CHANGELOG.md index cb1fb3c653c..372a47e9f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 +* [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529 * [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series_per_request"}` #7989 #8010 * [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456 * [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609 diff --git a/Makefile b/Makefile index 4c885a19af3..d8ab4e9c918 100644 --- a/Makefile +++ b/Makefile @@ -450,6 +450,12 @@ lint: check-makefiles "github.com/grafana/mimir/pkg/util/objtools" \ ./pkg/... ./cmd/... ./integration/... + # Use the more performant metadata.ValueFromIncomingContext wherever possible (if not possible, we can always put + # a lint ignore directive to skip linting). + faillint -paths \ + "google.golang.org/grpc/metadata.{FromIncomingContext}=google.golang.org/grpc/metadata.ValueFromIncomingContext" \ + ./pkg/... ./cmd/... ./integration/... + format: ## Run gofmt and goimports. find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \; find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec goimports -w -local github.com/grafana/mimir {} \; diff --git a/pkg/querier/api/consistency.go b/pkg/querier/api/consistency.go index ece184d0fa0..694556a7549 100644 --- a/pkg/querier/api/consistency.go +++ b/pkg/querier/api/consistency.go @@ -70,8 +70,7 @@ func ReadConsistencyClientUnaryInterceptor(ctx context.Context, method string, r } func ReadConsistencyServerUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - md, _ := metadata.FromIncomingContext(ctx) - consistencies := md.Get(consistencyLevelGrpcMdKey) + consistencies := metadata.ValueFromIncomingContext(ctx, consistencyLevelGrpcMdKey) if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) { ctx = ContextWithReadConsistency(ctx, consistencies[0]) } @@ -86,8 +85,7 @@ func ReadConsistencyClientStreamInterceptor(ctx context.Context, desc *grpc.Stre } func ReadConsistencyServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - md, _ := metadata.FromIncomingContext(ss.Context()) - consistencies := md.Get(consistencyLevelGrpcMdKey) + consistencies := metadata.ValueFromIncomingContext(ss.Context(), consistencyLevelGrpcMdKey) if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) { ctx := ContextWithReadConsistency(ss.Context(), consistencies[0]) ss = ctxStream{ diff --git a/pkg/querier/api/consistency_test.go b/pkg/querier/api/consistency_test.go new file mode 100644 index 00000000000..a4f6d39b021 --- /dev/null +++ b/pkg/querier/api/consistency_test.go @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package api + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var ( + // The following fixture has been generated looking at the actual metadata received by Grafana Mimir. + exampleIncomingMetadata = metadata.New(map[string]string{ + "authority": "1.1.1.1", + "content-type": "application/grpc", + "grpc-accept-encoding": "snappy,gzip", + "uber-trace-id": "xxx", + "user-agent": "grpc-go/1.61.1", + "x-scope-orgid": "user-1", + }) +) + +func BenchmarkReadConsistencyServerUnaryInterceptor(b *testing.B) { + for _, withReadConsistency := range []bool{true, false} { + b.Run(fmt.Sprintf("with read consistency: %t", withReadConsistency), func(b *testing.B) { + md := exampleIncomingMetadata + if withReadConsistency { + md = metadata.Join(md, metadata.New(map[string]string{consistencyLevelGrpcMdKey: ReadConsistencyStrong})) + } + + ctx := metadata.NewIncomingContext(context.Background(), md) + + for n := 0; n < b.N; n++ { + _, _ = ReadConsistencyServerUnaryInterceptor(ctx, nil, nil, func(ctx context.Context, req any) (any, error) { + return nil, nil + }) + } + }) + } +} + +func BenchmarkReadConsistencyServerStreamInterceptor(b *testing.B) { + for _, withReadConsistency := range []bool{true, false} { + b.Run(fmt.Sprintf("with read consistency: %t", withReadConsistency), func(b *testing.B) { + md := exampleIncomingMetadata + if withReadConsistency { + md = metadata.Join(md, metadata.New(map[string]string{consistencyLevelGrpcMdKey: ReadConsistencyStrong})) + } + + stream := serverStreamMock{ctx: metadata.NewIncomingContext(context.Background(), md)} + + for n := 0; n < b.N; n++ { + _ = ReadConsistencyServerStreamInterceptor(nil, stream, nil, func(_ any, _ grpc.ServerStream) error { + return nil + }) + } + }) + } +} + +type serverStreamMock struct { + grpc.ServerStream + + ctx context.Context +} + +func (m serverStreamMock) Context() context.Context { + return m.ctx +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 72d5c48bda4..fe79bf3530b 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -617,12 +617,7 @@ func (u *BucketStores) Collect(metrics chan<- prometheus.Metric) { } func getUserIDFromGRPCContext(ctx context.Context) string { - meta, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "" - } - - values := meta.Get(GrpcContextMetadataTenantID) + values := metadata.ValueFromIncomingContext(ctx, GrpcContextMetadataTenantID) if len(values) != 1 { return "" } diff --git a/pkg/util/extract_forwarded.go b/pkg/util/extract_forwarded.go index b34ac847969..9a4bab540a3 100644 --- a/pkg/util/extract_forwarded.go +++ b/pkg/util/extract_forwarded.go @@ -29,12 +29,8 @@ func GetSourceIPsFromOutgoingCtx(ctx context.Context) string { // GetSourceIPsFromIncomingCtx extracts the source field from the GRPC context func GetSourceIPsFromIncomingCtx(ctx context.Context) string { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "" - } - ipAddresses, ok := md[ipAddressesKey] - if !ok { + ipAddresses := metadata.ValueFromIncomingContext(ctx, ipAddressesKey) + if len(ipAddresses) == 0 { return "" } return ipAddresses[0]