Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529
* [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series_per_request"}` #7989 #8010
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ lint: check-makefiles
"github.com/grafana/mimir/pkg/util/objtools" \
./pkg/... ./cmd/... ./integration/...

# Use the more performant metadata.ValueFromIncomingContext wherever possible (if not possible, we can always put
# a lint ignore directive to skip linting).
faillint -paths \
"google.golang.org/grpc/metadata.{FromIncomingContext}=google.golang.org/grpc/metadata.ValueFromIncomingContext" \
./pkg/... ./cmd/... ./integration/...

format: ## Run gofmt and goimports.
find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \;
find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec goimports -w -local github.com/grafana/mimir {} \;
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/api/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func ReadConsistencyClientUnaryInterceptor(ctx context.Context, method string, r
}

func ReadConsistencyServerUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, _ := metadata.FromIncomingContext(ctx)
consistencies := md.Get(consistencyLevelGrpcMdKey)
consistencies := metadata.ValueFromIncomingContext(ctx, consistencyLevelGrpcMdKey)
if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) {
ctx = ContextWithReadConsistency(ctx, consistencies[0])
}
Expand All @@ -86,8 +85,7 @@ func ReadConsistencyClientStreamInterceptor(ctx context.Context, desc *grpc.Stre
}

func ReadConsistencyServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, _ := metadata.FromIncomingContext(ss.Context())
consistencies := md.Get(consistencyLevelGrpcMdKey)
consistencies := metadata.ValueFromIncomingContext(ss.Context(), consistencyLevelGrpcMdKey)
if len(consistencies) > 0 && IsValidReadConsistency(consistencies[0]) {
ctx := ContextWithReadConsistency(ss.Context(), consistencies[0])
ss = ctxStream{
Expand Down
72 changes: 72 additions & 0 deletions pkg/querier/api/consistency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-License-Identifier: AGPL-3.0-only

package api

import (
"context"
"fmt"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

var (
// The following fixture has been generated looking at the actual metadata received by Grafana Mimir.
exampleIncomingMetadata = metadata.New(map[string]string{
"authority": "1.1.1.1",
"content-type": "application/grpc",
"grpc-accept-encoding": "snappy,gzip",
"uber-trace-id": "xxx",
"user-agent": "grpc-go/1.61.1",
"x-scope-orgid": "user-1",
})
)

func BenchmarkReadConsistencyServerUnaryInterceptor(b *testing.B) {
for _, withReadConsistency := range []bool{true, false} {
b.Run(fmt.Sprintf("with read consistency: %t", withReadConsistency), func(b *testing.B) {
md := exampleIncomingMetadata
if withReadConsistency {
md = metadata.Join(md, metadata.New(map[string]string{consistencyLevelGrpcMdKey: ReadConsistencyStrong}))
}

ctx := metadata.NewIncomingContext(context.Background(), md)

for n := 0; n < b.N; n++ {
_, _ = ReadConsistencyServerUnaryInterceptor(ctx, nil, nil, func(ctx context.Context, req any) (any, error) {
return nil, nil
})
}
})
}
}

func BenchmarkReadConsistencyServerStreamInterceptor(b *testing.B) {
for _, withReadConsistency := range []bool{true, false} {
b.Run(fmt.Sprintf("with read consistency: %t", withReadConsistency), func(b *testing.B) {
md := exampleIncomingMetadata
if withReadConsistency {
md = metadata.Join(md, metadata.New(map[string]string{consistencyLevelGrpcMdKey: ReadConsistencyStrong}))
}

stream := serverStreamMock{ctx: metadata.NewIncomingContext(context.Background(), md)}

for n := 0; n < b.N; n++ {
_ = ReadConsistencyServerStreamInterceptor(nil, stream, nil, func(_ any, _ grpc.ServerStream) error {
return nil
})
}
})
}
}

type serverStreamMock struct {
grpc.ServerStream

ctx context.Context
}

func (m serverStreamMock) Context() context.Context {
return m.ctx
}
7 changes: 1 addition & 6 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,12 +617,7 @@ func (u *BucketStores) Collect(metrics chan<- prometheus.Metric) {
}

func getUserIDFromGRPCContext(ctx context.Context) string {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}

values := meta.Get(GrpcContextMetadataTenantID)
values := metadata.ValueFromIncomingContext(ctx, GrpcContextMetadataTenantID)
if len(values) != 1 {
return ""
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/util/extract_forwarded.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ func GetSourceIPsFromOutgoingCtx(ctx context.Context) string {

// GetSourceIPsFromIncomingCtx extracts the source field from the GRPC context
func GetSourceIPsFromIncomingCtx(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
ipAddresses, ok := md[ipAddressesKey]
if !ok {
ipAddresses := metadata.ValueFromIncomingContext(ctx, ipAddressesKey)
if len(ipAddresses) == 0 {
return ""
}
return ipAddresses[0]
Expand Down