diff --git a/CHANGELOG.md b/CHANGELOG.md index a2211363775..edac42bbd94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,7 @@ * [ENHANCEMENT] Output all config fields to /config API, including those with empty value. #2209 * [ENHANCEMENT] Add "missing_metric_name" and "metric_name_invalid" reasons to cortex_discarded_samples_total metric. #2346 * [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370 +* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383 * [BUGFIX] Ensure user state metrics are updated if a transfer fails. #2338 * [BUGFIX] Fixed etcd client keepalive settings. #2278 * [BUGFIX] Fixed bug in updating last element of FIFO cache. #2270 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0f6051a5a08..094a5536e8f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -329,7 +329,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie return nil, err } - var lastPartialErr error + var firstPartialErr error removeReplica := false numSamples := 0 @@ -398,8 +398,8 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // Errors in validation are considered non-fatal, as one series in a request may contain // invalid data but all the remaining series could be perfectly valid. - if err != nil { - lastPartialErr = err + if err != nil && firstPartialErr == nil { + firstPartialErr = err } // validateSeries would have returned an emptyPreallocSeries if there were no valid samples. @@ -417,7 +417,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // Ensure the request slice is reused if there's no series passing the validation. client.ReuseSlice(req.Timeseries) - return &client.WriteResponse{}, lastPartialErr + return &client.WriteResponse{}, firstPartialErr } now := time.Now() @@ -461,7 +461,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie if err != nil { return nil, err } - return &client.WriteResponse{}, lastPartialErr + return &client.WriteResponse{}, firstPartialErr } func sortLabelsIfNeeded(labels []client.LabelAdapter) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c22c7af7189..3ed14d1a607 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1139,6 +1139,18 @@ func TestDistributorValidation(t *testing.T) { }}, err: httpgrpc.Errorf(http.StatusBadRequest, `sample for 'testmetric{foo2="bar2", foo="bar"}' has 3 label names; limit 2`), }, + // Test multiple validation fails return the first one. + { + labels: []labels.Labels{ + {{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}, {Name: "foo2", Value: "bar2"}}, + {{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}, + }, + samples: []client.Sample{ + {TimestampMs: int64(now), Value: 2}, + {TimestampMs: int64(past), Value: 2}, + }, + err: httpgrpc.Errorf(http.StatusBadRequest, `sample for 'testmetric{foo2="bar2", foo="bar"}' has 3 label names; limit 2`), + }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { var limits validation.Limits diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b190742e3e9..37e49fe4c58 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -357,7 +357,7 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client. return nil, fmt.Errorf("no user id") } - var lastPartialErr *validationError + var firstPartialErr *validationError var record *Record if i.cfg.WALConfig.WALEnabled { record = recordPool.Get().(*Record) @@ -382,7 +382,9 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client. i.metrics.ingestedSamplesFail.Inc() if ve, ok := err.(*validationError); ok { - lastPartialErr = ve + if firstPartialErr == nil { + firstPartialErr = ve + } continue } @@ -391,9 +393,9 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client. } } - if lastPartialErr != nil { + if firstPartialErr != nil { // grpcForwardableError turns the error into a string so it no longer references `req` - return &client.WriteResponse{}, grpcForwardableError(userID, lastPartialErr.code, lastPartialErr) + return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr) } if record != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 69a94d2c89e..6ba960f4ed4 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -476,6 +476,43 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) { assert.Equal(t, expected, res) } +func TestIngesterValidation(t *testing.T) { + _, ing := newDefaultTestStore(t) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + userID := "1" + ctx := user.InjectOrgID(context.Background(), userID) + m := labelPairs{{Name: labels.MetricName, Value: "testmetric"}} + + // As a setup, let's append samples. + err := ing.append(context.Background(), userID, m, 1, 0, client.API, nil) + require.NoError(t, err) + + for _, tc := range []struct { + desc string + lbls []labels.Labels + samples []client.Sample + err error + }{ + { + desc: "With multiple append failures, only return the first error.", + lbls: []labels.Labels{ + {{Name: labels.MetricName, Value: "testmetric"}}, + {{Name: labels.MetricName, Value: "testmetric"}}, + }, + samples: []client.Sample{ + {TimestampMs: 0, Value: 0}, // earlier timestamp, out of order. + {TimestampMs: 1, Value: 2}, // same timestamp different value. + }, + err: httpgrpc.Errorf(http.StatusBadRequest, `user=1: sample timestamp out of order; last timestamp: 0.001, incoming timestamp: 0 for series {__name__="testmetric"}`), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + _, err := ing.Push(ctx, client.ToWriteRequest(tc.lbls, tc.samples, client.API)) + require.Equal(t, tc.err, err) + }) + } +} + func BenchmarkIngesterSeriesCreationLocking(b *testing.B) { for i := 1; i <= 32; i++ { b.Run(strconv.Itoa(i), func(b *testing.B) {