Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand Down Expand Up @@ -330,9 +331,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))

if partialdata.IsPartialDataError(err) {
level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error())
level.Warn(util_log.WithContext(ctx, d.log)).Log("msg", "returning partial data", "err", err.Error())
d.ingesterPartialDataQueries.Inc()
return resp, err
return resp, partialdata.ErrPartialData
}

return resp, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/querier/partialdata"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// ReplicationSet describes the instances to talk to for a given key, and how
Expand Down Expand Up @@ -80,6 +81,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults
return nil, res.err
}

if validation.IsLimitError(res.err) {
return nil, res.err
}

// force one of the delayed requests to start
if delay > 0 && r.MaxUnavailableZones == 0 {
forceStart <- struct{}{}
Expand Down
29 changes: 24 additions & 5 deletions pkg/ring/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/querier/partialdata"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func TestReplicationSet_GetAddresses(t *testing.T) {
Expand Down Expand Up @@ -196,12 +197,17 @@ func TestReplicationSet_Do(t *testing.T) {
expectedError: errZoneFailure,
},
{
name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)",
instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}},
f: failingFunctionOnZones("zone1", "zone2"),
name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (6 instances)",
instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}, {Addr: "10.0.0.4", Zone: "zone1"}, {Addr: "10.0.0.5", Zone: "zone2"}, {Addr: "10.0.0.6", Zone: "zone3"}},
f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) {
if ing.Addr == "10.0.0.1" || ing.Addr == "10.0.0.2" {
return nil, errZoneFailure
}
return 1, nil
},
maxUnavailableZones: 1,
queryPartialData: true,
want: []interface{}{1},
want: []interface{}{1, 1, 1, 1},
expectedError: partialdata.ErrPartialData,
errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"},
},
Expand All @@ -213,6 +219,19 @@ func TestReplicationSet_Do(t *testing.T) {
expectedError: errZoneFailure,
queryPartialData: true,
},
{
name: "with partial data enabled, should fail on instances returning 422",
instances: []InstanceDesc{{Addr: "1", Zone: "zone1"}, {Addr: "2", Zone: "zone2"}, {Addr: "3", Zone: "zone3"}, {Addr: "4", Zone: "zone1"}, {Addr: "5", Zone: "zone2"}, {Addr: "6", Zone: "zone3"}},
f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) {
if ing.Addr == "1" || ing.Addr == "2" {
return nil, validation.LimitError("limit breached")
}
return 1, nil
},
maxUnavailableZones: 1,
expectedError: validation.LimitError("limit breached"),
queryPartialData: true,
},
{
name: "max unavailable zones = 1, should succeed on instances failing in 1 out of 3 zones (6 instances)",
instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone3"}},
Expand Down Expand Up @@ -266,7 +285,7 @@ func TestReplicationSet_Do(t *testing.T) {
}
got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f)
if tt.expectedError != nil {
assert.ErrorIs(t, err, tt.expectedError)
assert.ErrorContains(t, err, tt.expectedError.Error())
for _, str := range tt.errStrContains {
assert.ErrorContains(t, err, str)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/ring/replication_set_tracker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ring

import "fmt"
import (
"fmt"
)

type replicationSetResultTracker interface {
// Signals an instance has done the execution, either successful (no error)
Expand Down Expand Up @@ -121,7 +123,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) {
if err != nil {
t.failuresByZone[instance.Zone]++
t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err))
t.errors = append(t.errors, fmt.Errorf("(%s, %s) %w", instance.GetAddr(), instance.GetZone(), err))
} else {
if _, ok := t.resultsPerZone[instance.Zone]; !ok {
// If it is the first result in the zone, then total number of instances
Expand Down Expand Up @@ -160,7 +162,9 @@ func (t *zoneAwareResultTracker) failed() bool {

func (t *zoneAwareResultTracker) failedCompletely() bool {
failedZones := len(t.failuresByZone)
return failedZones == t.zoneCount
allZonesFailed := failedZones == t.zoneCount
atLeastHalfOfFleetFailed := len(t.errors) >= t.numInstances/2
return allZonesFailed || (t.failed() && atLeastHalfOfFleetFailed)
}

func (t *zoneAwareResultTracker) getResults() []interface{} {
Expand Down
16 changes: 14 additions & 2 deletions pkg/ring/replication_set_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ func TestZoneAwareResultTracker(t *testing.T) {
assert.True(t, tracker.failedCompletely())
},
},
"failedCompletely() should return true if failed() is true and half of the fleet are unavailable": {
instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6},
maxUnavailableZones: 1,
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
tracker.done(&instance1, nil, errors.New("test")) // Zone-a
tracker.done(&instance2, nil, errors.New("test")) // Zone-a
tracker.done(&instance3, nil, errors.New("test")) // Zone-b

assert.True(t, tracker.failed())
assert.True(t, tracker.failedCompletely())
},
},
"finished() should return true only if all instances are done": {
instances: []InstanceDesc{instance1, instance2},
maxUnavailableZones: 1,
Expand All @@ -483,11 +495,11 @@ func TestZoneAwareResultTracker(t *testing.T) {
maxUnavailableZones: 1,
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
tracker.done(&instance1, nil, errors.New("test1"))
err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1"))
err1 := fmt.Errorf("(%s, %s) %w", instance1.GetAddr(), instance2.GetZone(), errors.New("test1"))
assert.ElementsMatch(t, []error{err1}, tracker.getErrors())

tracker.done(&instance2, nil, errors.New("test2"))
err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2"))
err2 := fmt.Errorf("(%s, %s) %w", instance2.GetAddr(), instance2.GetZone(), errors.New("test2"))
assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors())
},
},
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func (e LimitError) Error() string {
return string(e)
}

func IsLimitError(e error) bool {
var limitError LimitError
return errors.As(e, &limitError)
}

type DisabledRuleGroup struct {
Namespace string `yaml:"namespace" doc:"nocli|description=namespace in which the rule group belongs"`
Name string `yaml:"name" doc:"nocli|description=name of the rule group"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package validation

import (
"encoding/json"
"fmt"
"reflect"
"regexp"
"strings"
Expand Down Expand Up @@ -885,3 +886,8 @@ func TestLimitsPerLabelSetsForSeries(t *testing.T) {
})
}
}

func TestIsLimitError(t *testing.T) {
assert.False(t, IsLimitError(fmt.Errorf("test error")))
assert.True(t, IsLimitError(LimitError("test error")))
}
Loading