diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8de7630e755..9645a65672f 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -22,6 +22,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/grpcutil" "github.com/cortexproject/cortex/pkg/util/limiter" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -330,9 +331,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats.AddFetchedSamples(uint64(resp.SamplesCount())) if partialdata.IsPartialDataError(err) { - level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error()) + level.Warn(util_log.WithContext(ctx, d.log)).Log("msg", "returning partial data", "err", err.Error()) d.ingesterPartialDataQueries.Inc() - return resp, err + return resp, partialdata.ErrPartialData } return resp, nil diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 745c742f990..31e4dc016fa 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -7,6 +7,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/querier/partialdata" + "github.com/cortexproject/cortex/pkg/util/validation" ) // ReplicationSet describes the instances to talk to for a given key, and how @@ -80,6 +81,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults return nil, res.err } + if validation.IsLimitError(res.err) { + return nil, res.err + } + // force one of the delayed requests to start if delay > 0 && r.MaxUnavailableZones == 0 { forceStart <- struct{}{} diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index d72e4cd5257..401ec7d4094 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -11,6 +11,7 @@ import ( "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/querier/partialdata" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestReplicationSet_GetAddresses(t *testing.T) { @@ -196,12 +197,17 @@ func TestReplicationSet_Do(t *testing.T) { expectedError: errZoneFailure, }, { - name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)", - instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}}, - f: failingFunctionOnZones("zone1", "zone2"), + name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (6 instances)", + instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}, {Addr: "10.0.0.4", Zone: "zone1"}, {Addr: "10.0.0.5", Zone: "zone2"}, {Addr: "10.0.0.6", Zone: "zone3"}}, + f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) { + if ing.Addr == "10.0.0.1" || ing.Addr == "10.0.0.2" { + return nil, errZoneFailure + } + return 1, nil + }, maxUnavailableZones: 1, queryPartialData: true, - want: []interface{}{1}, + want: []interface{}{1, 1, 1, 1}, expectedError: partialdata.ErrPartialData, errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"}, }, @@ -213,6 +219,19 @@ func TestReplicationSet_Do(t *testing.T) { expectedError: errZoneFailure, queryPartialData: true, }, + { + name: "with partial data enabled, should fail on instances returning 422", + instances: []InstanceDesc{{Addr: "1", Zone: "zone1"}, {Addr: "2", Zone: "zone2"}, {Addr: "3", Zone: "zone3"}, {Addr: "4", Zone: "zone1"}, {Addr: "5", Zone: "zone2"}, {Addr: "6", Zone: "zone3"}}, + f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) { + if ing.Addr == "1" || ing.Addr == "2" { + return nil, validation.LimitError("limit breached") + } + return 1, nil + }, + maxUnavailableZones: 1, + expectedError: validation.LimitError("limit breached"), + queryPartialData: true, + }, { name: "max unavailable zones = 1, should succeed on instances failing in 1 out of 3 zones (6 instances)", instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone3"}}, @@ -266,7 +285,7 @@ func TestReplicationSet_Do(t *testing.T) { } got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f) if tt.expectedError != nil { - assert.ErrorIs(t, err, tt.expectedError) + assert.ErrorContains(t, err, tt.expectedError.Error()) for _, str := range tt.errStrContains { assert.ErrorContains(t, err, str) } diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index a0f594b442c..bc7401240d7 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -1,6 +1,8 @@ package ring -import "fmt" +import ( + "fmt" +) type replicationSetResultTracker interface { // Signals an instance has done the execution, either successful (no error) @@ -121,7 +123,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ - t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err)) + t.errors = append(t.errors, fmt.Errorf("(%s, %s) %w", instance.GetAddr(), instance.GetZone(), err)) } else { if _, ok := t.resultsPerZone[instance.Zone]; !ok { // If it is the first result in the zone, then total number of instances @@ -160,7 +162,9 @@ func (t *zoneAwareResultTracker) failed() bool { func (t *zoneAwareResultTracker) failedCompletely() bool { failedZones := len(t.failuresByZone) - return failedZones == t.zoneCount + allZonesFailed := failedZones == t.zoneCount + atLeastHalfOfFleetFailed := len(t.errors) >= t.numInstances/2 + return allZonesFailed || (t.failed() && atLeastHalfOfFleetFailed) } func (t *zoneAwareResultTracker) getResults() []interface{} { diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index ed782739c77..e5ee5c9de16 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -467,6 +467,18 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.True(t, tracker.failedCompletely()) }, }, + "failedCompletely() should return true if failed() is true and half of the fleet are unavailable": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) // Zone-a + tracker.done(&instance2, nil, errors.New("test")) // Zone-a + tracker.done(&instance3, nil, errors.New("test")) // Zone-b + + assert.True(t, tracker.failed()) + assert.True(t, tracker.failedCompletely()) + }, + }, "finished() should return true only if all instances are done": { instances: []InstanceDesc{instance1, instance2}, maxUnavailableZones: 1, @@ -483,11 +495,11 @@ func TestZoneAwareResultTracker(t *testing.T) { maxUnavailableZones: 1, run: func(t *testing.T, tracker *zoneAwareResultTracker) { tracker.done(&instance1, nil, errors.New("test1")) - err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1")) + err1 := fmt.Errorf("(%s, %s) %w", instance1.GetAddr(), instance2.GetZone(), errors.New("test1")) assert.ElementsMatch(t, []error{err1}, tracker.getErrors()) tracker.done(&instance2, nil, errors.New("test2")) - err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2")) + err2 := fmt.Errorf("(%s, %s) %w", instance2.GetAddr(), instance2.GetZone(), errors.New("test2")) assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors()) }, }, diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 8d46235ebe6..6419dc6ba89 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -50,6 +50,11 @@ func (e LimitError) Error() string { return string(e) } +func IsLimitError(e error) bool { + var limitError LimitError + return errors.As(e, &limitError) +} + type DisabledRuleGroup struct { Namespace string `yaml:"namespace" doc:"nocli|description=namespace in which the rule group belongs"` Name string `yaml:"name" doc:"nocli|description=name of the rule group"` diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 414cb3e8d45..df7760e3829 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -2,6 +2,7 @@ package validation import ( "encoding/json" + "fmt" "reflect" "regexp" "strings" @@ -885,3 +886,8 @@ func TestLimitsPerLabelSetsForSeries(t *testing.T) { }) } } + +func TestIsLimitError(t *testing.T) { + assert.False(t, IsLimitError(fmt.Errorf("test error"))) + assert.True(t, IsLimitError(LimitError("test error"))) +}