diff --git a/CHANGELOG.md b/CHANGELOG.md index 2be5d52031b..d38b1ead1d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: deprecated `-store.max-look-back-period`. You should use `-querier.max-query-lookback` instead. #3452 +* [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414 * [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412 - `cortex_ingester_tsdb_wal_corruptions_total` - `cortex_ingester_tsdb_head_truncations_failed_total` diff --git a/docs/guides/zone-replication.md b/docs/guides/zone-replication.md index c64be0c1f57..56abf3ac7e4 100644 --- a/docs/guides/zone-replication.md +++ b/docs/guides/zone-replication.md @@ -11,6 +11,8 @@ It is completely possible that all the replicas for the given data are held with For this reason, Cortex optionally supports zone-aware replication. When zone-aware replication is **enabled**, replicas for the given data are guaranteed to span across different availability zones. This requires Cortex cluster to run at least in a number of zones equal to the configured replication factor. +Reads from a zone-aware replication enabled Cortex Cluster can withstand zone failures as long as there are no more than `floor(replication factor / 2)` zones with failing instances. + The Cortex services supporting **zone-aware replication** are: - **[Distributors and Ingesters](#distributors-and-ingesters-time-series-replication)** @@ -26,6 +28,10 @@ The Cortex time-series replication is used to hold multiple (typically 3) replic 2. Rollout ingesters to apply the configured zone 3. Enable time-series zone-aware replication via the `-distributor.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to distributors, queriers and rulers. +The `-distributor.shard-by-all-labels` setting has an impact on read availability. When enabled, a metric is sharded across all ingesters and querier needs to fetch series from all ingesters while, when disabled, a metric is sharded only across `` ingesters. + +In the event of a large outage impacting ingesters in more than 1 zone, when `-distributor.shard-by-all-labels=true` all queries will fail, while when disabled some queries may still succeed if the ingesters holding the required metric are not impacted by the outage. + ## Store-gateways: blocks replication The Cortex [store-gateway](../blocks-storage/store-gateway.md) (used only when Cortex is running with the [blocks storage](../blocks-storage/_index.md)) supports blocks sharding, used to horizontally scale blocks in a large cluster without hitting any vertical scalability limit. diff --git a/integration/e2e/service.go b/integration/e2e/service.go index c9931c0b443..ffb8f211481 100644 --- a/integration/e2e/service.go +++ b/integration/e2e/service.go @@ -168,10 +168,15 @@ func (s *ConcreteService) Kill() error { logger.Log("Killing", s.name) - if out, err := RunCommandAndGetOutput("docker", "stop", "--time=0", s.containerName()); err != nil { + if out, err := RunCommandAndGetOutput("docker", "kill", s.containerName()); err != nil { logger.Log(string(out)) return err } + + // Wait until the container actually stopped. However, this could fail if + // the container already exited, so we just ignore the error. + _, _ = RunCommandAndGetOutput("docker", "wait", s.containerName()) + s.usedNetworkName = "" return nil diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 1fd57beec5c..3650e8e50c2 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -153,7 +153,6 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) { } func (c *Client) query(addr string) (*http.Response, []byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go new file mode 100644 index 00000000000..ec36e720acb --- /dev/null +++ b/integration/zone_aware_test.go @@ -0,0 +1,150 @@ +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestZoneAwareReplication(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := BlocksStorageFlags() + flags["-distributor.shard-by-all-labels"] = "true" + flags["-distributor.replication-factor"] = "3" + flags["-distributor.zone-awareness-enabled"] = "true" + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + ingesterFlags := func(zone string) map[string]string { + return mergeFlags(flags, map[string]string{ + "-ingester.availability-zone": zone, + }) + } + + ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") + ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") + ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") + ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") + ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") + ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") + require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6)) + + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, querier)) + + // Wait until distributor and querier have updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + // Push some series + now := time.Now() + numSeries := 100 + expectedVectors := map[string]model.Vector{} + + for i := 1; i <= numSeries; i++ { + metricName := fmt.Sprintf("series_%d", i) + series, expectedVector := generateSeries(metricName, now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + expectedVectors[metricName] = expectedVector + } + + // Query back series => all good + for metricName, expectedVector := range expectedVectors { + result, err := client.Query(metricName, now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + } + + // SIGKILL 1 ingester in 1st zone + require.NoError(t, ingester1.Kill()) + + // Push 1 more series => all good + numSeries++ + metricName := fmt.Sprintf("series_%d", numSeries) + series, expectedVector := generateSeries(metricName, now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + expectedVectors[metricName] = expectedVector + + // Query back series => all good + for metricName, expectedVector := range expectedVectors { + result, err := client.Query(metricName, now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + } + + // SIGKILL 1 more ingester in the 1st zone (all ingesters in 1st zone have been killed) + require.NoError(t, ingester2.Kill()) + + // Push 1 more series => all good + numSeries++ + metricName = fmt.Sprintf("series_%d", numSeries) + series, expectedVector = generateSeries(metricName, now) + res, err = client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + expectedVectors[metricName] = expectedVector + + // Query back series => all good + for metricName, expectedVector := range expectedVectors { + result, err := client.Query(metricName, now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + } + + // SIGKILL 1 ingester in the 2nd zone + require.NoError(t, ingester3.Kill()) + + // Query back any series => fail (either because of a timeout or 500) + result, _, err := client.QueryRaw("series_1") + if !errors.Is(err, context.DeadlineExceeded) { + require.NoError(t, err) + require.Equal(t, 500, result.StatusCode) + } + + // SIGKILL 1 more ingester in the 2nd zone (all ingesters in 2nd zone have been killed) + require.NoError(t, ingester4.Kill()) + + // Push 1 more series => fail + series, _ = generateSeries("series_last", now) + res, err = client.Push(series) + require.NoError(t, err) + require.Equal(t, 500, res.StatusCode) + +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index e003b87f148..7826f37e538 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1283,6 +1283,9 @@ func (i *mockIngester) series() map[uint32]*client.PreallocTimeseries { } func (i *mockIngester) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + i.Lock() + defer i.Unlock() + i.trackCall("Check") return &grpc_health_v1.HealthCheckResponse{}, nil diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 3b15541f3ad..adc619e85cb 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -10,25 +10,45 @@ import ( // many errors to tolerate. type ReplicationSet struct { Ingesters []IngesterDesc + + // Maximum number of tolerated failing instances. Max errors and max unavailable zones are + // mutually exclusive. MaxErrors int + + // Maximum number of different zones in which instances can fail. Max unavailable zones and + // max errors are mutually exclusive. + MaxUnavailableZones int } // Do function f in parallel for all replicas in the set, erroring is we exceed // MaxErrors and returning early otherwise. func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *IngesterDesc) (interface{}, error)) ([]interface{}, error) { + type instanceResult struct { + res interface{} + err error + instance *IngesterDesc + } + + // Initialise the result tracker, which is use to keep track of successes and failures. + var tracker replicationSetResultTracker + if r.MaxUnavailableZones > 0 { + tracker = newZoneAwareResultTracker(r.Ingesters, r.MaxUnavailableZones) + } else { + tracker = newDefaultResultTracker(r.Ingesters, r.MaxErrors) + } + var ( - errs = make(chan error, len(r.Ingesters)) - resultsChan = make(chan interface{}, len(r.Ingesters)) - minSuccess = len(r.Ingesters) - r.MaxErrors - forceStart = make(chan struct{}, r.MaxErrors) + ch = make(chan instanceResult, len(r.Ingesters)) + forceStart = make(chan struct{}, r.MaxErrors) ) ctx, cancel := context.WithCancel(ctx) defer cancel() + // Spawn a goroutine for each instance. for i := range r.Ingesters { go func(i int, ing *IngesterDesc) { - // wait to send extra requests - if i >= minSuccess && delay > 0 { + // Wait to send extra requests. Works only when zone-awareness is disabled. + if delay > 0 && r.MaxUnavailableZones == 0 && i >= len(r.Ingesters)-r.MaxErrors { after := time.NewTimer(delay) defer after.Stop() select { @@ -39,32 +59,32 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont } } result, err := f(ctx, ing) - if err != nil { - errs <- err - } else { - resultsChan <- result + ch <- instanceResult{ + res: result, + err: err, + instance: ing, } }(i, &r.Ingesters[i]) } - var ( - numErrs int - numSuccess int - results = make([]interface{}, 0, len(r.Ingesters)) - ) - for numSuccess < minSuccess { + results := make([]interface{}, 0, len(r.Ingesters)) + + for !tracker.succeeded() { select { - case err := <-errs: - numErrs++ - if numErrs > r.MaxErrors { - return nil, err - } - // force one of the delayed requests to start - forceStart <- struct{}{} + case res := <-ch: + tracker.done(res.instance, res.err) + if res.err != nil { + if tracker.failed() { + return nil, res.err + } - case result := <-resultsChan: - numSuccess++ - results = append(results, result) + // force one of the delayed requests to start + if delay > 0 && r.MaxUnavailableZones == 0 { + forceStart <- struct{}{} + } + } else { + results = append(results, res.res) + } case <-ctx.Done(): return nil, ctx.Err() diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index db11459cf10..d8c853fe303 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -1,9 +1,14 @@ package ring import ( + "context" + "errors" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestReplicationSet_GetAddresses(t *testing.T) { @@ -33,3 +38,164 @@ func TestReplicationSet_GetAddresses(t *testing.T) { }) } } + +var ( + errFailure = errors.New("failed") + errZoneFailure = errors.New("zone failed") +) + +// Return a function that fails starting from failAfter times +func failingFunctionAfter(failAfter int32, delay time.Duration) func(context.Context, *IngesterDesc) (interface{}, error) { + count := atomic.NewInt32(0) + return func(context.Context, *IngesterDesc) (interface{}, error) { + time.Sleep(delay) + if count.Inc() > failAfter { + return nil, errFailure + } + return 1, nil + } +} + +func failingFunctionOnZones(zones ...string) func(context.Context, *IngesterDesc) (interface{}, error) { + return func(ctx context.Context, ing *IngesterDesc) (interface{}, error) { + for _, zone := range zones { + if ing.Zone == zone { + return nil, errZoneFailure + } + } + return 1, nil + } +} + +func TestReplicationSet_Do(t *testing.T) { + tests := []struct { + name string + instances []IngesterDesc + maxErrors int + maxUnavailableZones int + f func(context.Context, *IngesterDesc) (interface{}, error) + delay time.Duration + cancelContextDelay time.Duration + want []interface{} + expectedError error + }{ + { + name: "max errors = 0, no errors no delay", + instances: []IngesterDesc{ + {}, + }, + f: func(c context.Context, id *IngesterDesc) (interface{}, error) { + return 1, nil + }, + want: []interface{}{1}, + }, + { + name: "max errors = 0, should fail on 1 error out of 1 instance", + instances: []IngesterDesc{{}}, + f: func(c context.Context, id *IngesterDesc) (interface{}, error) { + return nil, errFailure + }, + want: nil, + expectedError: errFailure, + }, + { + name: "max errors = 0, should fail on 1 error out of 3 instances (last call fails)", + instances: []IngesterDesc{{}, {}, {}}, + f: failingFunctionAfter(2, 10*time.Millisecond), + want: nil, + expectedError: errFailure, + }, + { + name: "max errors = 1, should fail on 3 errors out of 5 instances (last calls fail)", + instances: []IngesterDesc{{}, {}, {}, {}, {}}, + maxErrors: 1, + f: failingFunctionAfter(2, 10*time.Millisecond), + delay: 100 * time.Millisecond, + want: nil, + expectedError: errFailure, + }, + { + name: "max errors = 1, should handle context canceled", + instances: []IngesterDesc{{}, {}, {}}, + maxErrors: 1, + f: func(c context.Context, id *IngesterDesc) (interface{}, error) { + time.Sleep(300 * time.Millisecond) + return 1, nil + }, + cancelContextDelay: 100 * time.Millisecond, + want: nil, + expectedError: context.Canceled, + }, + { + name: "max errors = 0, should succeed on all successful instances", + instances: []IngesterDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: func(c context.Context, id *IngesterDesc) (interface{}, error) { + return 1, nil + }, + want: []interface{}{1, 1, 1}, + }, + { + name: "max unavailable zones = 1, should succeed on instances failing in 1 out of 3 zones (3 instances)", + instances: []IngesterDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: failingFunctionOnZones("zone1"), + maxUnavailableZones: 1, + want: []interface{}{1, 1}, + }, + { + name: "max unavailable zones = 1, should fail on instances failing in 2 out of 3 zones (3 instances)", + instances: []IngesterDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: failingFunctionOnZones("zone1", "zone2"), + maxUnavailableZones: 1, + expectedError: errZoneFailure, + }, + { + name: "max unavailable zones = 1, should succeed on instances failing in 1 out of 3 zones (6 instances)", + instances: []IngesterDesc{{Zone: "zone1"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone3"}}, + f: failingFunctionOnZones("zone1"), + maxUnavailableZones: 1, + want: []interface{}{1, 1, 1, 1}, + }, + { + name: "max unavailable zones = 2, should fail on instances failing in 3 out of 5 zones (5 instances)", + instances: []IngesterDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone4"}, {Zone: "zone5"}}, + f: failingFunctionOnZones("zone1", "zone2", "zone3"), + maxUnavailableZones: 2, + expectedError: errZoneFailure, + }, + { + name: "max unavailable zones = 2, should succeed on instances failing in 2 out of 5 zones (10 instances)", + instances: []IngesterDesc{{Zone: "zone1"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone3"}, {Zone: "zone4"}, {Zone: "zone4"}, {Zone: "zone5"}, {Zone: "zone5"}}, + f: failingFunctionOnZones("zone1", "zone5"), + maxUnavailableZones: 2, + want: []interface{}{1, 1, 1, 1, 1, 1}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Ensure the test case has been correctly setup (max errors and max unavailable zones are + // mutually exclusive). + require.False(t, tt.maxErrors > 0 && tt.maxUnavailableZones > 0) + + r := ReplicationSet{ + Ingesters: tt.instances, + MaxErrors: tt.maxErrors, + MaxUnavailableZones: tt.maxUnavailableZones, + } + ctx := context.Background() + if tt.cancelContextDelay > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + time.AfterFunc(tt.cancelContextDelay, func() { + cancel() + }) + } + got, err := r.Do(ctx, tt.delay, tt.f) + if tt.expectedError != nil { + assert.Equal(t, tt.expectedError, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go new file mode 100644 index 00000000000..09f12e3cebb --- /dev/null +++ b/pkg/ring/replication_set_tracker.go @@ -0,0 +1,96 @@ +package ring + +type replicationSetResultTracker interface { + // Signals an instance has done the execution, either successful (no error) + // or failed (with error). + done(instance *IngesterDesc, err error) + + // Returns true if the minimum number of successful results have been received. + succeeded() bool + + // Returns true if the maximum number of failed executions have been reached. + failed() bool +} + +type defaultResultTracker struct { + minSucceeded int + numSucceeded int + numErrors int + maxErrors int +} + +func newDefaultResultTracker(instances []IngesterDesc, maxErrors int) *defaultResultTracker { + return &defaultResultTracker{ + minSucceeded: len(instances) - maxErrors, + numSucceeded: 0, + numErrors: 0, + maxErrors: maxErrors, + } +} + +func (t *defaultResultTracker) done(_ *IngesterDesc, err error) { + if err == nil { + t.numSucceeded++ + } else { + t.numErrors++ + } +} + +func (t *defaultResultTracker) succeeded() bool { + return t.numSucceeded >= t.minSucceeded +} + +func (t *defaultResultTracker) failed() bool { + return t.numErrors > t.maxErrors +} + +// zoneAwareResultTracker tracks the results per zone. +// All instances in a zone must succeed in order for the zone to succeed. +type zoneAwareResultTracker struct { + waitingByZone map[string]int + failuresByZone map[string]int + minSuccessfulZones int + maxUnavailableZones int +} + +func newZoneAwareResultTracker(instances []IngesterDesc, maxUnavailableZones int) *zoneAwareResultTracker { + t := &zoneAwareResultTracker{ + waitingByZone: make(map[string]int), + failuresByZone: make(map[string]int), + maxUnavailableZones: maxUnavailableZones, + } + + for _, instance := range instances { + t.waitingByZone[instance.Zone]++ + } + t.minSuccessfulZones = len(t.waitingByZone) - maxUnavailableZones + + return t +} + +func (t *zoneAwareResultTracker) done(instance *IngesterDesc, err error) { + t.waitingByZone[instance.Zone]-- + + if err != nil { + t.failuresByZone[instance.Zone]++ + } +} + +func (t *zoneAwareResultTracker) succeeded() bool { + successfulZones := 0 + + // The execution succeeded once we successfully received a successful result + // from "all zones - max unavailable zones". + for zone, numWaiting := range t.waitingByZone { + if numWaiting == 0 && t.failuresByZone[zone] == 0 { + successfulZones++ + } + } + + return successfulZones >= t.minSuccessfulZones +} + +func (t *zoneAwareResultTracker) failed() bool { + failedZones := len(t.failuresByZone) + return failedZones > t.maxUnavailableZones +} diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go new file mode 100644 index 00000000000..cb981c80571 --- /dev/null +++ b/pkg/ring/replication_set_tracker_test.go @@ -0,0 +1,266 @@ +package ring + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultResultTracker(t *testing.T) { + instance1 := IngesterDesc{Addr: "127.0.0.1"} + instance2 := IngesterDesc{Addr: "127.0.0.2"} + instance3 := IngesterDesc{Addr: "127.0.0.3"} + instance4 := IngesterDesc{Addr: "127.0.0.4"} + + tests := map[string]struct { + instances []IngesterDesc + maxErrors int + run func(t *testing.T, tracker *defaultResultTracker) + }{ + "should succeed on no instances to track": { + instances: nil, + maxErrors: 0, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should succeed once all instances succeed on max errors = 0": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4}, + maxErrors: 0, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should fail on 1st failing instance on max errors = 0": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4}, + maxErrors: 0, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + }, + }, + "should fail on 2nd failing instance on max errors = 1": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + }, + }, + "should fail on 3rd failing instance on max errors = 2": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4}, + maxErrors: 2, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + }, + }, + } + + for testName, testCase := range tests { + t.Run(testName, func(t *testing.T) { + testCase.run(t, newDefaultResultTracker(testCase.instances, testCase.maxErrors)) + }) + } +} + +func TestZoneAwareResultTracker(t *testing.T) { + instance1 := IngesterDesc{Addr: "127.0.0.1", Zone: "zone-a"} + instance2 := IngesterDesc{Addr: "127.0.0.2", Zone: "zone-a"} + instance3 := IngesterDesc{Addr: "127.0.0.3", Zone: "zone-b"} + instance4 := IngesterDesc{Addr: "127.0.0.4", Zone: "zone-b"} + instance5 := IngesterDesc{Addr: "127.0.0.5", Zone: "zone-c"} + instance6 := IngesterDesc{Addr: "127.0.0.6", Zone: "zone-c"} + + tests := map[string]struct { + instances []IngesterDesc + maxUnavailableZones int + run func(t *testing.T, tracker *zoneAwareResultTracker) + }{ + "should succeed on no instances to track": { + instances: nil, + maxUnavailableZones: 0, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should succeed once all instances succeed on max unavailable zones = 0": { + instances: []IngesterDesc{instance1, instance2, instance3}, + maxUnavailableZones: 0, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should fail on 1st failing instance on max unavailable zones = 0": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 0, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + }, + }, + "should succeed on 2 failing instances within the same zone on max unavailable zones = 1": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + // Track failing instances. + for _, instance := range []IngesterDesc{instance1, instance2} { + tracker.done(&instance, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + } + + // Track successful instances. + for _, instance := range []IngesterDesc{instance3, instance4, instance5} { + tracker.done(&instance, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + } + + tracker.done(&instance6, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should succeed as soon as the response has been successfully received from 'all zones - 1' on max unavailable zones = 1": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + // Track successful instances. + for _, instance := range []IngesterDesc{instance1, instance2, instance3} { + tracker.done(&instance, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + } + + tracker.done(&instance4, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should succeed on failing instances within 2 zones on max unavailable zones = 2": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 2, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + // Track failing instances. + for _, instance := range []IngesterDesc{instance1, instance2, instance3, instance4} { + tracker.done(&instance, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + } + + // Track successful instances. + tracker.done(&instance5, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance6, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + "should succeed as soon as the response has been successfully received from 'all zones - 2' on max unavailable zones = 2": { + instances: []IngesterDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 2, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + // Zone-a + tracker.done(&instance1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + // Zone-b + tracker.done(&instance3, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + // Zone-a + tracker.done(&instance2, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + }, + }, + } + + for testName, testCase := range tests { + t.Run(testName, func(t *testing.T) { + testCase.run(t, newZoneAwareResultTracker(testCase.instances, testCase.maxUnavailableZones)) + }) + } +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 6d2492c94ef..a0d8723e2c1 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -363,29 +363,74 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro return ReplicationSet{}, ErrEmptyRing } - // Calculate the number of required ingesters; - // ensure we always require at least RF-1 when RF=3. - numRequired := len(r.ringDesc.Ingesters) - if numRequired < r.cfg.ReplicationFactor { - numRequired = r.cfg.ReplicationFactor - } - maxUnavailable := r.cfg.ReplicationFactor / 2 - numRequired -= maxUnavailable - - ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) + // Build the initial replication set, excluding unhealthy instances. + healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) + zoneFailures := make(map[string]struct{}) for _, ingester := range r.ringDesc.Ingesters { if r.IsHealthy(&ingester, op) { - ingesters = append(ingesters, ingester) + healthyInstances = append(healthyInstances, ingester) + } else { + zoneFailures[ingester.Zone] = struct{}{} } } - if len(ingesters) < numRequired { - return ReplicationSet{}, ErrTooManyFailedIngesters + // Max errors and max unavailable zones are mutually exclusive. We initialise both + // to 0 and then we update them whether zone-awareness is enabled or not. + maxErrors := 0 + maxUnavailableZones := 0 + + if r.cfg.ZoneAwarenessEnabled { + // Given data is replicated to RF different zones, we can tolerate a number of + // RF/2 failing zones. However, we need to protect from the case the ring currently + // contains instances in a number of zones < RF. + numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor) + minSuccessZones := (numReplicatedZones / 2) + 1 + maxUnavailableZones = minSuccessZones - 1 + + if len(zoneFailures) > maxUnavailableZones { + return ReplicationSet{}, ErrTooManyFailedIngesters + } + + if len(zoneFailures) > 0 { + // We remove all instances (even healthy ones) from zones with at least + // 1 failing ingester. Due to how replication works when zone-awareness is + // enabled (data is replicated to RF different zones), there's no benefit in + // querying healthy instances from "failing zones". A zone is considered + // failed if there is single error. + filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) + for _, ingester := range healthyInstances { + if _, ok := zoneFailures[ingester.Zone]; !ok { + filteredInstances = append(filteredInstances, ingester) + } + } + + healthyInstances = filteredInstances + } + + // Since we removed all instances from zones containing at least 1 failing + // instance, we have to decrease the max unavailable zones accordingly. + maxUnavailableZones -= len(zoneFailures) + } else { + // Calculate the number of required ingesters; + // ensure we always require at least RF-1 when RF=3. + numRequired := len(r.ringDesc.Ingesters) + if numRequired < r.cfg.ReplicationFactor { + numRequired = r.cfg.ReplicationFactor + } + // We can tolerate this many failures + numRequired -= r.cfg.ReplicationFactor / 2 + + if len(healthyInstances) < numRequired { + return ReplicationSet{}, ErrTooManyFailedIngesters + } + + maxErrors = len(healthyInstances) - numRequired } return ReplicationSet{ - Ingesters: ingesters, - MaxErrors: len(ingesters) - numRequired, + Ingesters: healthyInstances, + MaxErrors: maxErrors, + MaxUnavailableZones: maxUnavailableZones, }, nil } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 189fac4b904..7b36347d078 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -418,6 +418,331 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) { } } +func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing.T) { + tests := map[string]struct { + ringInstances map[string]IngesterDesc + unhealthyInstances []string + expectedAddresses []string + replicationFactor int + expectedError error + expectedMaxErrors int + expectedMaxUnavailableZones int + }{ + "empty ring": { + ringInstances: nil, + expectedError: ErrEmptyRing, + }, + "RF=1, 1 zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2"}, + replicationFactor: 1, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=1, 1 zone, one unhealthy instance": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + }, + unhealthyInstances: []string{"instance-2"}, + replicationFactor: 1, + expectedError: ErrTooManyFailedIngesters, + }, + "RF=1, 3 zones, one unhealthy instance": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + unhealthyInstances: []string{"instance-3"}, + replicationFactor: 1, + expectedError: ErrTooManyFailedIngesters, + }, + "RF=2, 2 zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2"}, + replicationFactor: 2, + expectedMaxUnavailableZones: 1, + }, + "RF=2, 2 zones, one unhealthy instance": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1"}, + unhealthyInstances: []string{"instance-2"}, + replicationFactor: 2, + }, + "RF=3, 3 zones, one instance per zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 1, + }, + "RF=3, 3 zones, one instance per zone, one instance unhealthy": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.2", "127.0.0.3"}, + unhealthyInstances: []string{"instance-1"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=3, 3 zones, one instance per zone, two instances unhealthy in separate zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + unhealthyInstances: []string{"instance-1", "instance-2"}, + replicationFactor: 3, + expectedError: ErrTooManyFailedIngesters, + }, + "RF=3, 3 zones, one instance per zone, all instances unhealthy": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + unhealthyInstances: []string{"instance-1", "instance-2", "instance-3"}, + replicationFactor: 3, + expectedError: ErrTooManyFailedIngesters, + }, + "RF=3, 3 zones, two instances per zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 1, + }, + "RF=3, 3 zones, two instances per zone, two instances unhealthy in same zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.5", "127.0.0.6"}, + unhealthyInstances: []string{"instance-3", "instance-4"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=3, 3 zones, three instances per zone, two instances unhealthy in same zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.7", "127.0.0.8", "127.0.0.9"}, + unhealthyInstances: []string{"instance-4", "instance-6"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=3, only 2 zones, two instances per zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 1, + }, + "RF=3, only 2 zones, two instances per zone, one instance unhealthy": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2"}, + unhealthyInstances: []string{"instance-4"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=3, only 1 zone, two instances per zone": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2"}, + replicationFactor: 3, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=3, only 1 zone, two instances per zone, one instance unhealthy": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + }, + unhealthyInstances: []string{"instance-2"}, + replicationFactor: 3, + expectedError: ErrTooManyFailedIngesters, + }, + "RF=5, 5 zones, two instances per zone except for one zone which has three": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + "instance-10": {Addr: "127.0.0.10", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + "instance-11": {Addr: "127.0.0.11", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5", + "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.11"}, + replicationFactor: 5, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 2, + }, + "RF=5, 5 zones, two instances per zone except for one zone which has three, 2 unhealthy nodes in same zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + "instance-10": {Addr: "127.0.0.10", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + "instance-11": {Addr: "127.0.0.11", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.11"}, + unhealthyInstances: []string{"instance-3", "instance-4"}, + replicationFactor: 5, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 1, + }, + "RF=5, 5 zones, two instances per zone except for one zone which has three, 2 unhealthy nodes in separate zones": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + "instance-10": {Addr: "127.0.0.10", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + "instance-11": {Addr: "127.0.0.11", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + }, + expectedAddresses: []string{"127.0.0.1", "127.0.0.2", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.11"}, + unhealthyInstances: []string{"instance-3", "instance-5"}, + replicationFactor: 5, + expectedMaxErrors: 0, + expectedMaxUnavailableZones: 0, + }, + "RF=5, 5 zones, one instances per zone, three unhealthy instances": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-d", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-e", Tokens: GenerateTokens(128, nil)}, + }, + unhealthyInstances: []string{"instance-2", "instance-4", "instance-5"}, + replicationFactor: 5, + expectedError: ErrTooManyFailedIngesters, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Ensure the test case has been correctly setup (max errors and max unavailable zones are + // mutually exclusive). + require.False(t, testData.expectedMaxErrors > 0 && testData.expectedMaxUnavailableZones > 0) + + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + instance.Timestamp = time.Now().Unix() + instance.State = ACTIVE + for _, instanceName := range testData.unhealthyInstances { + if instanceName == id { + instance.Timestamp = time.Now().Add(-time.Hour).Unix() + } + } + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Minute, + ZoneAwarenessEnabled: true, + ReplicationFactor: testData.replicationFactor, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + // Check the replication set has the correct settings + replicationSet, err := ring.GetReplicationSetForOperation(Read) + if testData.expectedError == nil { + require.NoError(t, err) + } else { + require.Equal(t, testData.expectedError, err) + } + + assert.Equal(t, testData.expectedMaxErrors, replicationSet.MaxErrors) + assert.Equal(t, testData.expectedMaxUnavailableZones, replicationSet.MaxUnavailableZones) + + returnAddresses := []string{} + for _, instance := range replicationSet.Ingesters { + returnAddresses = append(returnAddresses, instance.Addr) + } + for _, addr := range testData.expectedAddresses { + assert.Contains(t, returnAddresses, addr) + } + assert.Equal(t, len(testData.expectedAddresses), len(replicationSet.Ingesters)) + }) + } +} + func TestRing_ShuffleShard(t *testing.T) { tests := map[string]struct { ringInstances map[string]IngesterDesc @@ -1194,6 +1519,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { cfg: Config{ HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, + ReplicationFactor: 3, }, ringDesc: ringDesc, ringTokens: ringDesc.getTokens(),