From 38a477977e9a69b877766185dc7b59e451bb7101 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 1 May 2024 16:56:06 -0700 Subject: [PATCH 1/5] Added metric to keep track of how many in progress request from one distributor to one ingester. Created inflight request limit per ingester client. Signed-off-by: Alex Le --- pkg/distributor/distributor.go | 17 +++++++++------ pkg/ingester/client/client.go | 6 +++-- pkg/ingester/client/cortex_util_test.go | 2 +- pkg/ingester/client/ingester.pb.go | 29 +++++++++++++++++++++++-- 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index dfaa12d89c3..e970b0c3d03 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -118,6 +118,7 @@ type Distributor struct { ingesterQueryFailures *prometheus.CounterVec replicationFactor prometheus.Gauge latestSeenSampleTimestampPerUser *prometheus.GaugeVec + inflightPushRequestsCount prometheus.Gauge } // Config contains the configuration required to @@ -344,6 +345,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "cortex_distributor_latest_seen_sample_timestamp_seconds", Help: "Unix timestamp of latest received sample per user.", }, []string{"user"}), + inflightPushRequestsCount: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_distributor_inflight_push_requests", + Help: "Current number of inflight push requests in distributor.", + }), } promauto.With(reg).NewGauge(prometheus.GaugeOpts{ @@ -357,12 +362,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove ConstLabels: map[string]string{limitLabel: "max_ingestion_rate"}, }).Set(cfg.InstanceLimits.MaxIngestionRate) - promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ - Name: "cortex_distributor_inflight_push_requests", - Help: "Current number of inflight push requests in distributor.", - }, func() float64 { - return float64(d.inflightPushRequests.Load()) - }) promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_distributor_ingestion_rate_samples_per_second", Help: "Current ingestion rate in samples/sec that distributor is using to limit access.", @@ -591,7 +590,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // We will report *this* request in the error too. inflight := d.inflightPushRequests.Inc() - defer d.inflightPushRequests.Dec() + d.inflightPushRequestsCount.Inc() + defer func() { + d.inflightPushRequestsCount.Dec() + d.inflightPushRequests.Dec() + }() now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index e6c7e334e1b..9c62029c77e 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -55,7 +55,7 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error return nil, err } return &closableHealthAndIngesterClient{ - IngesterClient: NewIngesterClient(conn), + IngesterClient: NewIngesterClient(conn, cfg.MaxInflightPushRequests), HealthClient: grpc_health_v1.NewHealthClient(conn), conn: conn, }, nil @@ -67,12 +67,14 @@ func (c *closableHealthAndIngesterClient) Close() error { // Config is the configuration struct for the ingester client type Config struct { - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` } // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) + f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.") } func (cfg *Config) Validate(log log.Logger) error { diff --git a/pkg/ingester/client/cortex_util_test.go b/pkg/ingester/client/cortex_util_test.go index 30dd8bc9a16..f80f6bc241f 100644 --- a/pkg/ingester/client/cortex_util_test.go +++ b/pkg/ingester/client/cortex_util_test.go @@ -134,7 +134,7 @@ func TestStreamingSends(t *testing.T) { require.NoError(t, server.Serve(listen)) }() - client := NewIngesterClient(conn) + client := NewIngesterClient(conn, 0) err = testData.clientRecv(clientCtx, client) assert.Equal(t, true, grpcutil.IsGRPCContextCanceled(err)) diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go index 847ace69ca8..a391a8e1fa9 100644 --- a/pkg/ingester/client/ingester.pb.go +++ b/pkg/ingester/client/ingester.pb.go @@ -6,14 +6,18 @@ package client import ( + atomic "go.uber.org/atomic" bytes "bytes" context "context" encoding_binary "encoding/binary" + errors "github.com/pkg/errors" fmt "fmt" cortexpb "github.com/cortexproject/cortex/pkg/cortexpb" github_com_cortexproject_cortex_pkg_cortexpb "github.com/cortexproject/cortex/pkg/cortexpb" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + promauto "github.com/prometheus/client_golang/prometheus/promauto" + prometheus "github.com/prometheus/client_golang/prometheus" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -2750,17 +2754,38 @@ type IngesterClient interface { MetricsMetadata(ctx context.Context, in *MetricsMetadataRequest, opts ...grpc.CallOption) (*MetricsMetadataResponse, error) } +var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") +var inflightRequestCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "ingester_client_request_count", + Help: "Number of Ingester client requests.", +}, []string{"ingester"}) + type ingesterClient struct { cc *grpc.ClientConn + maxInflightPushRequests int64 + inflightRequests atomic.Int64 + inflightRequestCount prometheus.Gauge } -func NewIngesterClient(cc *grpc.ClientConn) IngesterClient { - return &ingesterClient{cc} +func NewIngesterClient(cc *grpc.ClientConn, maxInflightPushRequests int64) IngesterClient { + return &ingesterClient{ + cc: cc, + maxInflightPushRequests: maxInflightPushRequests, + inflightRequestCount: inflightRequestCount.WithLabelValues(cc.Target()), + } } func (c *ingesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + currentInflight := c.inflightRequests.Inc() + defer c.inflightRequests.Dec() + if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests { + return nil, errTooManyInflightPushRequests + } out := new(cortexpb.WriteResponse) + c.inflightRequestCount.Inc() err := c.cc.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) + c.inflightRequestCount.Dec() if err != nil { return nil, err } From 13775a69f57cee20a4a07622656d0e98700e6e79 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 2 May 2024 16:59:21 -0700 Subject: [PATCH 2/5] moved throttling logic to closableHealthAndIngesterClient Signed-off-by: Alex Le --- pkg/ingester/client/client.go | 53 ++++++++++++++++++++----- pkg/ingester/client/cortex_util_test.go | 2 +- pkg/ingester/client/ingester.pb.go | 29 +------------- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 9c62029c77e..29b47f98508 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -8,8 +8,10 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -20,6 +22,13 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra Help: "Time spent doing Ingester requests.", Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), }, []string{"operation", "status_code"}) +var inflightRequestCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "ingester_client_request_count", + Help: "Number of Ingester client requests.", +}, []string{"ingester"}) + +var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. type HealthAndIngesterClient interface { @@ -32,16 +41,40 @@ type HealthAndIngesterClient interface { type closableHealthAndIngesterClient struct { IngesterClient grpc_health_v1.HealthClient - conn *grpc.ClientConn + conn *grpc.ClientConn + maxInflightPushRequests int64 + inflightRequests atomic.Int64 + inflightRequestCount prometheus.Gauge } func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { - out := new(cortexpb.WriteResponse) - err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) - if err != nil { - return nil, err + return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) { + out := new(cortexpb.WriteResponse) + err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil + }) +} + +func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) { + return c.IngesterClient.Push(ctx, in, opts...) + }) +} + +func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) { + currentInflight := c.inflightRequests.Inc() + c.inflightRequestCount.Inc() + defer func() { + c.inflightRequestCount.Dec() + c.inflightRequests.Dec() + }() + if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests { + return nil, errTooManyInflightPushRequests } - return out, nil + return mainFunc() } // MakeIngesterClient makes a new IngesterClient @@ -55,9 +88,11 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error return nil, err } return &closableHealthAndIngesterClient{ - IngesterClient: NewIngesterClient(conn, cfg.MaxInflightPushRequests), - HealthClient: grpc_health_v1.NewHealthClient(conn), - conn: conn, + IngesterClient: NewIngesterClient(conn), + HealthClient: grpc_health_v1.NewHealthClient(conn), + conn: conn, + maxInflightPushRequests: cfg.MaxInflightPushRequests, + inflightRequestCount: inflightRequestCount.WithLabelValues(addr), }, nil } diff --git a/pkg/ingester/client/cortex_util_test.go b/pkg/ingester/client/cortex_util_test.go index f80f6bc241f..30dd8bc9a16 100644 --- a/pkg/ingester/client/cortex_util_test.go +++ b/pkg/ingester/client/cortex_util_test.go @@ -134,7 +134,7 @@ func TestStreamingSends(t *testing.T) { require.NoError(t, server.Serve(listen)) }() - client := NewIngesterClient(conn, 0) + client := NewIngesterClient(conn) err = testData.clientRecv(clientCtx, client) assert.Equal(t, true, grpcutil.IsGRPCContextCanceled(err)) diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go index a391a8e1fa9..847ace69ca8 100644 --- a/pkg/ingester/client/ingester.pb.go +++ b/pkg/ingester/client/ingester.pb.go @@ -6,18 +6,14 @@ package client import ( - atomic "go.uber.org/atomic" bytes "bytes" context "context" encoding_binary "encoding/binary" - errors "github.com/pkg/errors" fmt "fmt" cortexpb "github.com/cortexproject/cortex/pkg/cortexpb" github_com_cortexproject_cortex_pkg_cortexpb "github.com/cortexproject/cortex/pkg/cortexpb" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" - promauto "github.com/prometheus/client_golang/prometheus/promauto" - prometheus "github.com/prometheus/client_golang/prometheus" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -2754,38 +2750,17 @@ type IngesterClient interface { MetricsMetadata(ctx context.Context, in *MetricsMetadataRequest, opts ...grpc.CallOption) (*MetricsMetadataResponse, error) } -var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") -var inflightRequestCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "cortex", - Name: "ingester_client_request_count", - Help: "Number of Ingester client requests.", -}, []string{"ingester"}) - type ingesterClient struct { cc *grpc.ClientConn - maxInflightPushRequests int64 - inflightRequests atomic.Int64 - inflightRequestCount prometheus.Gauge } -func NewIngesterClient(cc *grpc.ClientConn, maxInflightPushRequests int64) IngesterClient { - return &ingesterClient{ - cc: cc, - maxInflightPushRequests: maxInflightPushRequests, - inflightRequestCount: inflightRequestCount.WithLabelValues(cc.Target()), - } +func NewIngesterClient(cc *grpc.ClientConn) IngesterClient { + return &ingesterClient{cc} } func (c *ingesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { - currentInflight := c.inflightRequests.Inc() - defer c.inflightRequests.Dec() - if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests { - return nil, errTooManyInflightPushRequests - } out := new(cortexpb.WriteResponse) - c.inflightRequestCount.Inc() err := c.cc.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) - c.inflightRequestCount.Dec() if err != nil { return nil, err } From d69489cd8055cae8315ca0d37a317c1276534618 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 2 May 2024 18:01:02 -0700 Subject: [PATCH 3/5] update changelog and config doc and reverted metric cortex_distributor_inflight_push_requests Signed-off-by: Alex Le --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 +++++ pkg/distributor/distributor.go | 17 +++++++---------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6fcd515437..58914d747bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Ruler: Remove `experimental.ruler.api-enable-rules-backup` flag and use `ruler.ring.replication-factor` to check if rules backup is enabled * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 +* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917 * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 ## 1.17.0 in progress diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 30bbee288c9..489114cab31 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3025,6 +3025,11 @@ grpc_client_config: # Skip validating server certificate. # CLI flag: -ingester.client.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] + +# Max inflight push requests that this ingester client can handle. This limit is +# per-ingester-client. Additional requests will be rejected. 0 = unlimited. +# CLI flag: -ingester.client.max-inflight-push-requests +[max_inflight_push_requests: | default = 0] ``` ### `limits_config` diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e970b0c3d03..dfaa12d89c3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -118,7 +118,6 @@ type Distributor struct { ingesterQueryFailures *prometheus.CounterVec replicationFactor prometheus.Gauge latestSeenSampleTimestampPerUser *prometheus.GaugeVec - inflightPushRequestsCount prometheus.Gauge } // Config contains the configuration required to @@ -345,10 +344,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "cortex_distributor_latest_seen_sample_timestamp_seconds", Help: "Unix timestamp of latest received sample per user.", }, []string{"user"}), - inflightPushRequestsCount: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_distributor_inflight_push_requests", - Help: "Current number of inflight push requests in distributor.", - }), } promauto.With(reg).NewGauge(prometheus.GaugeOpts{ @@ -362,6 +357,12 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove ConstLabels: map[string]string{limitLabel: "max_ingestion_rate"}, }).Set(cfg.InstanceLimits.MaxIngestionRate) + promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_distributor_inflight_push_requests", + Help: "Current number of inflight push requests in distributor.", + }, func() float64 { + return float64(d.inflightPushRequests.Load()) + }) promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_distributor_ingestion_rate_samples_per_second", Help: "Current ingestion rate in samples/sec that distributor is using to limit access.", @@ -590,11 +591,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // We will report *this* request in the error too. inflight := d.inflightPushRequests.Inc() - d.inflightPushRequestsCount.Inc() - defer func() { - d.inflightPushRequestsCount.Dec() - d.inflightPushRequests.Dec() - }() + defer d.inflightPushRequests.Dec() now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) From 2212f33a59fa244d70698f66f15260f847320376 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 3 May 2024 17:12:24 -0700 Subject: [PATCH 4/5] add unit test Signed-off-by: Alex Le --- pkg/ingester/client/client.go | 8 ++- pkg/ingester/client/client_test.go | 83 ++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 29b47f98508..ba6047f070e 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -30,6 +30,12 @@ var inflightRequestCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") +// ClosableClientConn is grpc.ClientConnInterface with Close function +type ClosableClientConn interface { + grpc.ClientConnInterface + Close() error +} + // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. type HealthAndIngesterClient interface { IngesterClient @@ -41,7 +47,7 @@ type HealthAndIngesterClient interface { type closableHealthAndIngesterClient struct { IngesterClient grpc_health_v1.HealthClient - conn *grpc.ClientConn + conn ClosableClientConn maxInflightPushRequests int64 inflightRequests atomic.Int64 inflightRequestCount prometheus.Gauge diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 59fae123fa2..2fed7aeab70 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -6,7 +6,10 @@ import ( "strconv" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" @@ -47,3 +50,83 @@ func TestMarshall(t *testing.T) { require.Equal(t, numSeries, len(req.Timeseries)) } } + +func TestClosableHealthAndIngesterClient_MaxInflightPushRequests(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + inflightPushRequests int64 + maxInflightPushRequests int64 + expectThrottle bool + }{ + "no limit": { + inflightPushRequests: 1000, + maxInflightPushRequests: 0, + expectThrottle: false, + }, + "inflight request is under limit": { + inflightPushRequests: 99, + maxInflightPushRequests: 100, + expectThrottle: false, + }, + "inflight request hits limit": { + inflightPushRequests: 100, + maxInflightPushRequests: 100, + expectThrottle: true, + }, + } + ctx := context.Background() + for testName, testData := range tests { + tData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + client1 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests) + _, err := client1.Push(ctx, nil) + if tData.expectThrottle { + assert.ErrorIs(t, err, errTooManyInflightPushRequests) + } else { + assert.NoError(t, err) + } + + client2 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests) + _, err = client2.PushPreAlloc(ctx, nil) + if tData.expectThrottle { + assert.ErrorIs(t, err, errTooManyInflightPushRequests) + } else { + assert.NoError(t, err) + } + }) + } +} + +func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequests int64) *closableHealthAndIngesterClient { + client := &closableHealthAndIngesterClient{ + IngesterClient: &mockIngester{}, + conn: &mockClientConn{}, + maxInflightPushRequests: maxInflightPushRequests, + inflightRequestCount: prometheus.NewGauge(prometheus.GaugeOpts{}), + } + client.inflightRequests.Add(currentInflightRequests) + return client +} + +type mockIngester struct { + IngesterClient +} + +func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil +} + +type mockClientConn struct { + ClosableClientConn +} + +func (m *mockClientConn) Invoke(_ context.Context, _ string, _ any, _ any, _ ...grpc.CallOption) error { + return nil +} + +func (m *mockClientConn) Close() error { + return nil +} From 793e4920ef3bc59b151b2419dcf0edf911b8627c Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 6 May 2024 16:50:30 -0700 Subject: [PATCH 5/5] rename Signed-off-by: Alex Le --- pkg/ingester/client/client.go | 14 +++++++------- pkg/ingester/client/client_test.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index ba6047f070e..7bb1a6a29ce 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -22,10 +22,10 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra Help: "Time spent doing Ingester requests.", Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), }, []string{"operation", "status_code"}) -var inflightRequestCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ +var ingesterClientInflightPushRequests = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "cortex", - Name: "ingester_client_request_count", - Help: "Number of Ingester client requests.", + Name: "ingester_client_inflight_push_requests", + Help: "Number of Ingester client push requests.", }, []string{"ingester"}) var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") @@ -50,7 +50,7 @@ type closableHealthAndIngesterClient struct { conn ClosableClientConn maxInflightPushRequests int64 inflightRequests atomic.Int64 - inflightRequestCount prometheus.Gauge + inflightPushRequests prometheus.Gauge } func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { @@ -72,9 +72,9 @@ func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) { currentInflight := c.inflightRequests.Inc() - c.inflightRequestCount.Inc() + c.inflightPushRequests.Inc() defer func() { - c.inflightRequestCount.Dec() + c.inflightPushRequests.Dec() c.inflightRequests.Dec() }() if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests { @@ -98,7 +98,7 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error HealthClient: grpc_health_v1.NewHealthClient(conn), conn: conn, maxInflightPushRequests: cfg.MaxInflightPushRequests, - inflightRequestCount: inflightRequestCount.WithLabelValues(addr), + inflightPushRequests: ingesterClientInflightPushRequests.WithLabelValues(addr), }, nil } diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 2fed7aeab70..270db7d4588 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -105,7 +105,7 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ IngesterClient: &mockIngester{}, conn: &mockClientConn{}, maxInflightPushRequests: maxInflightPushRequests, - inflightRequestCount: prometheus.NewGauge(prometheus.GaugeOpts{}), + inflightPushRequests: prometheus.NewGauge(prometheus.GaugeOpts{}), } client.inflightRequests.Add(currentInflightRequests) return client