diff --git a/CHANGELOG.md b/CHANGELOG.md index b7d680d1c37..8b8d21c8389 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 * [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919 +* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917 * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 ## 1.17.0 2024-04-30 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1ad826306eb..d4609077435 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3026,6 +3026,11 @@ grpc_client_config: # Skip validating server certificate. # CLI flag: -ingester.client.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] + +# Max inflight push requests that this ingester client can handle. This limit is +# per-ingester-client. Additional requests will be rejected. 0 = unlimited. +# CLI flag: -ingester.client.max-inflight-push-requests +[max_inflight_push_requests: | default = 0] ``` ### `limits_config` diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index e6c7e334e1b..7bb1a6a29ce 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -8,8 +8,10 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -20,6 +22,19 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra Help: "Time spent doing Ingester requests.", Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), }, []string{"operation", "status_code"}) +var ingesterClientInflightPushRequests = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "ingester_client_inflight_push_requests", + Help: "Number of Ingester client push requests.", +}, []string{"ingester"}) + +var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") + +// ClosableClientConn is grpc.ClientConnInterface with Close function +type ClosableClientConn interface { + grpc.ClientConnInterface + Close() error +} // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. type HealthAndIngesterClient interface { @@ -32,16 +47,40 @@ type HealthAndIngesterClient interface { type closableHealthAndIngesterClient struct { IngesterClient grpc_health_v1.HealthClient - conn *grpc.ClientConn + conn ClosableClientConn + maxInflightPushRequests int64 + inflightRequests atomic.Int64 + inflightPushRequests prometheus.Gauge } func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { - out := new(cortexpb.WriteResponse) - err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) - if err != nil { - return nil, err + return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) { + out := new(cortexpb.WriteResponse) + err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil + }) +} + +func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) { + return c.IngesterClient.Push(ctx, in, opts...) + }) +} + +func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) { + currentInflight := c.inflightRequests.Inc() + c.inflightPushRequests.Inc() + defer func() { + c.inflightPushRequests.Dec() + c.inflightRequests.Dec() + }() + if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests { + return nil, errTooManyInflightPushRequests } - return out, nil + return mainFunc() } // MakeIngesterClient makes a new IngesterClient @@ -55,9 +94,11 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error return nil, err } return &closableHealthAndIngesterClient{ - IngesterClient: NewIngesterClient(conn), - HealthClient: grpc_health_v1.NewHealthClient(conn), - conn: conn, + IngesterClient: NewIngesterClient(conn), + HealthClient: grpc_health_v1.NewHealthClient(conn), + conn: conn, + maxInflightPushRequests: cfg.MaxInflightPushRequests, + inflightPushRequests: ingesterClientInflightPushRequests.WithLabelValues(addr), }, nil } @@ -67,12 +108,14 @@ func (c *closableHealthAndIngesterClient) Close() error { // Config is the configuration struct for the ingester client type Config struct { - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` } // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) + f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.") } func (cfg *Config) Validate(log log.Logger) error { diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 59fae123fa2..270db7d4588 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -6,7 +6,10 @@ import ( "strconv" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" @@ -47,3 +50,83 @@ func TestMarshall(t *testing.T) { require.Equal(t, numSeries, len(req.Timeseries)) } } + +func TestClosableHealthAndIngesterClient_MaxInflightPushRequests(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + inflightPushRequests int64 + maxInflightPushRequests int64 + expectThrottle bool + }{ + "no limit": { + inflightPushRequests: 1000, + maxInflightPushRequests: 0, + expectThrottle: false, + }, + "inflight request is under limit": { + inflightPushRequests: 99, + maxInflightPushRequests: 100, + expectThrottle: false, + }, + "inflight request hits limit": { + inflightPushRequests: 100, + maxInflightPushRequests: 100, + expectThrottle: true, + }, + } + ctx := context.Background() + for testName, testData := range tests { + tData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + client1 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests) + _, err := client1.Push(ctx, nil) + if tData.expectThrottle { + assert.ErrorIs(t, err, errTooManyInflightPushRequests) + } else { + assert.NoError(t, err) + } + + client2 := createTestIngesterClient(tData.maxInflightPushRequests, tData.inflightPushRequests) + _, err = client2.PushPreAlloc(ctx, nil) + if tData.expectThrottle { + assert.ErrorIs(t, err, errTooManyInflightPushRequests) + } else { + assert.NoError(t, err) + } + }) + } +} + +func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequests int64) *closableHealthAndIngesterClient { + client := &closableHealthAndIngesterClient{ + IngesterClient: &mockIngester{}, + conn: &mockClientConn{}, + maxInflightPushRequests: maxInflightPushRequests, + inflightPushRequests: prometheus.NewGauge(prometheus.GaugeOpts{}), + } + client.inflightRequests.Add(currentInflightRequests) + return client +} + +type mockIngester struct { + IngesterClient +} + +func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil +} + +type mockClientConn struct { + ClosableClientConn +} + +func (m *mockClientConn) Invoke(_ context.Context, _ string, _ any, _ any, _ ...grpc.CallOption) error { + return nil +} + +func (m *mockClientConn) Close() error { + return nil +}