Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
* [ENHANCEMENT] Output all config fields to /config API, including those with empty value. #2209
* [ENHANCEMENT] Add "missing_metric_name" and "metric_name_invalid" reasons to cortex_discarded_samples_total metric. #2346
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #XXXX
* [BUGFIX] Ensure user state metrics are updated if a transfer fails. #2338
* [BUGFIX] Fixed etcd client keepalive settings. #2278
* [BUGFIX] Fixed bug in updating last element of FIFO cache. #2270
Expand Down
10 changes: 5 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, err
}

var lastPartialErr error
var firstPartialErr error
removeReplica := false

numSamples := 0
Expand Down Expand Up @@ -398,8 +398,8 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if err != nil {
lastPartialErr = err
if err != nil && firstPartialErr == nil {
firstPartialErr = err
}

// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
Expand All @@ -417,7 +417,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
// Ensure the request slice is reused if there's no series passing the validation.
client.ReuseSlice(req.Timeseries)

return &client.WriteResponse{}, lastPartialErr
return &client.WriteResponse{}, firstPartialErr
}

now := time.Now()
Expand Down Expand Up @@ -461,7 +461,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
if err != nil {
return nil, err
}
return &client.WriteResponse{}, lastPartialErr
return &client.WriteResponse{}, firstPartialErr
}

func sortLabelsIfNeeded(labels []client.LabelAdapter) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,18 @@ func TestDistributorValidation(t *testing.T) {
}},
err: httpgrpc.Errorf(http.StatusBadRequest, `sample for 'testmetric{foo2="bar2", foo="bar"}' has 3 label names; limit 2`),
},
// Test multiple validation fails return the first one.
{
labels: []labels.Labels{
{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}, {Name: "foo2", Value: "bar2"}},
{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}},
},
samples: []client.Sample{
{TimestampMs: int64(now), Value: 2},
{TimestampMs: int64(past), Value: 2},
},
err: httpgrpc.Errorf(http.StatusBadRequest, `sample for 'testmetric{foo2="bar2", foo="bar"}' has 3 label names; limit 2`),
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var limits validation.Limits
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
return nil, fmt.Errorf("no user id")
}

var lastPartialErr *validationError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TSDB blocks ingester already does this.

var firstPartialErr *validationError
var record *Record
if i.cfg.WALConfig.WALEnabled {
record = recordPool.Get().(*Record)
Expand All @@ -382,7 +382,9 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.

i.metrics.ingestedSamplesFail.Inc()
if ve, ok := err.(*validationError); ok {
lastPartialErr = ve
if firstPartialErr == nil {
firstPartialErr = ve
}
continue
}

Expand All @@ -391,9 +393,9 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
}
}

if lastPartialErr != nil {
if firstPartialErr != nil {
// grpcForwardableError turns the error into a string so it no longer references `req`
return &client.WriteResponse{}, grpcForwardableError(userID, lastPartialErr.code, lastPartialErr)
return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
}

if record != nil {
Expand Down
36 changes: 36 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,42 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {
assert.Equal(t, expected, res)
}

func TestIngesterValidation(t *testing.T) {
_, ing := newDefaultTestStore(t)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
userID := "1"
ctx := user.InjectOrgID(context.Background(), userID)
m := labelPairs{{Name: labels.MetricName, Value: "testmetric"}}

// As a setup, let's append samples.
ing.append(context.Background(), userID, m, 1, 0, client.API, nil)

for _, tc := range []struct {
desc string
lbls []labels.Labels
samples []client.Sample
err error
}{
{
desc: "With multiple append failures, only return the first error.",
lbls: []labels.Labels{
{{Name: labels.MetricName, Value: "testmetric"}},
{{Name: labels.MetricName, Value: "testmetric"}},
},
samples: []client.Sample{
{TimestampMs: 0, Value: 0}, // earlier timestamp, out of order.
{TimestampMs: 1, Value: 2}, // same timestamp different value.
},
err: httpgrpc.Errorf(http.StatusBadRequest, `user=1: sample timestamp out of order; last timestamp: 0.001, incoming timestamp: 0 for series {__name__="testmetric"}`),
},
} {
t.Run(tc.desc, func(t *testing.T) {
_, err := ing.Push(ctx, client.ToWriteRequest(tc.lbls, tc.samples, client.API))
require.Equal(t, tc.err, err)
})
}
}

func BenchmarkIngesterSeriesCreationLocking(b *testing.B) {
for i := 1; i <= 32; i++ {
b.Run(strconv.Itoa(i), func(b *testing.B) {
Expand Down