diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 7abc2ee3019..5dc53c8bb7f 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -704,7 +704,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM for _, c := range compactors { cortex_testutil.Poll(t, 10*time.Second, len(compactors), func() interface{} { // it is safe to access c.ring here, since we know that all compactors are Running now - rs, err := c.ring.GetAll(ring.Compactor) + rs, err := c.ring.GetAllHealthy(ring.Compactor) if err != nil { return 0 } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2ef559450b7..8c32ddd9f00 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -835,7 +835,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) { req := &client.UserStatsRequest{} ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID // Not using d.ForReplicationSet(), so we can fail after first error. - replicationSet, err := d.ingestersRing.GetAll(ring.Read) + replicationSet, err := d.ingestersRing.GetAllHealthy(ring.Read) if err != nil { return nil, err } diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index c6fd3aa6879..8f08268cd06 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -88,7 +88,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod if shardSize > 0 && lookbackPeriod > 0 { - return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetAll(ring.Read) + return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read) } } @@ -101,7 +101,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab } } - return d.ingestersRing.GetAll(ring.Read) + return d.ingestersRing.GetReplicationSetForOperation(ring.Read) } // GetIngestersForMetadata returns a replication set including all ingesters that should be queried @@ -119,11 +119,11 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod if shardSize > 0 && lookbackPeriod > 0 { - return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetAll(ring.Read) + return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read) } } - return d.ingestersRing.GetAll(ring.Read) + return d.ingestersRing.GetReplicationSetForOperation(ring.Read) } // queryIngesters queries the ingesters via the older, sample-based API. diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index da39149b488..d4f1cdc3d64 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -342,7 +342,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { // Wait until the ring client has initialised the state. test.Poll(t, time.Second, true, func() interface{} { - all, err := r.GetAll(ring.Read) + all, err := r.GetAllHealthy(ring.Read) return err == nil && len(all.Ingesters) > 0 }) diff --git a/pkg/ring/client/ring_service_discovery.go b/pkg/ring/client/ring_service_discovery.go index 93958920fa8..e0ab7ce64b9 100644 --- a/pkg/ring/client/ring_service_discovery.go +++ b/pkg/ring/client/ring_service_discovery.go @@ -6,7 +6,7 @@ import ( func NewRingServiceDiscovery(r ring.ReadRing) PoolServiceDiscovery { return func() ([]string, error) { - replicationSet, err := r.GetAll(ring.Read) + replicationSet, err := r.GetAllHealthy(ring.Reporting) if err != nil { return nil, err } diff --git a/pkg/ring/client/ring_service_discovery_test.go b/pkg/ring/client/ring_service_discovery_test.go index 6d0b74e0aac..b5ffb20a223 100644 --- a/pkg/ring/client/ring_service_discovery_test.go +++ b/pkg/ring/client/ring_service_discovery_test.go @@ -58,6 +58,6 @@ type mockReadRing struct { mockedErr error } -func (m *mockReadRing) GetAll(_ ring.Operation) (ring.ReplicationSet, error) { +func (m *mockReadRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { return m.mockedReplicationSet, m.mockedErr } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index da8a3907e10..6d2492c94ef 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -44,7 +44,18 @@ type ReadRing interface { // buf is a slice to be overwritten for the return value // to avoid memory allocation; can be nil. Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) - GetAll(op Operation) (ReplicationSet, error) + + // GetAllHealthy returns all healthy instances in the ring, for the given operation. + // This function doesn't check if the quorum is honored, so doesn't fail if the number + // of unhealthy ingesters is greater than the tolerated max unavailable. + GetAllHealthy(op Operation) (ReplicationSet, error) + + // GetReplicationSetForOperation returns all instances where the input operation should be executed. + // The resulting ReplicationSet doesn't necessarily contains all healthy instances + // in the ring, but could contain the minimum set of instances required to execute + // the input operation. + GetReplicationSetForOperation(op Operation) (ReplicationSet, error) + ReplicationFactor() int IngesterCount() int @@ -89,6 +100,10 @@ var ( // ErrInstanceNotFound is the error returned when trying to get information for an instance // not registered within the ring. ErrInstanceNotFound = errors.New("instance not found in the ring") + + // ErrTooManyFailedIngesters is the error returned when there are too many failed ingesters for a + // specific operation. + ErrTooManyFailedIngesters = errors.New("too many failed ingesters") ) // Config for a Ring @@ -317,8 +332,30 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet }, nil } -// GetAll returns all available ingesters in the ring. -func (r *Ring) GetAll(op Operation) (ReplicationSet, error) { +// GetAllHealthy implements ReadRing. +func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 { + return ReplicationSet{}, ErrEmptyRing + } + + ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) + for _, ingester := range r.ringDesc.Ingesters { + if r.IsHealthy(&ingester, op) { + ingesters = append(ingesters, ingester) + } + } + + return ReplicationSet{ + Ingesters: ingesters, + MaxErrors: 0, + }, nil +} + +// GetReplicationSetForOperation implements ReadRing. +func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() @@ -343,7 +380,7 @@ func (r *Ring) GetAll(op Operation) (ReplicationSet, error) { } if len(ingesters) < numRequired { - return ReplicationSet{}, fmt.Errorf("too many failed ingesters") + return ReplicationSet{}, ErrTooManyFailedIngesters } return ReplicationSet{ diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 3529fadde1e..189fac4b904 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -243,6 +243,181 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { } } +func TestRing_GetAllHealthy(t *testing.T) { + const heartbeatTimeout = time.Minute + now := time.Now() + + tests := map[string]struct { + ringInstances map[string]IngesterDesc + expectedErrForRead error + expectedSetForRead []string + expectedErrForWrite error + expectedSetForWrite []string + expectedErrForReporting error + expectedSetForReporting []string + }{ + "should return error on empty ring": { + ringInstances: nil, + expectedErrForRead: ErrEmptyRing, + expectedErrForWrite: ErrEmptyRing, + expectedErrForReporting: ErrEmptyRing, + }, + "should return all healthy instances for the given operation": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix()}, + "instance-2": {Addr: "127.0.0.2", State: PENDING, Timestamp: now.Add(-10 * time.Second).Unix()}, + "instance-3": {Addr: "127.0.0.3", State: JOINING, Timestamp: now.Add(-20 * time.Second).Unix()}, + "instance-4": {Addr: "127.0.0.4", State: LEAVING, Timestamp: now.Add(-30 * time.Second).Unix()}, + "instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix()}, + }, + expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.4"}, + expectedSetForWrite: []string{"127.0.0.1"}, + expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{HeartbeatTimeout: heartbeatTimeout}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + set, err := ring.GetAllHealthy(Read) + require.Equal(t, testData.expectedErrForRead, err) + assert.ElementsMatch(t, testData.expectedSetForRead, set.GetAddresses()) + + set, err = ring.GetAllHealthy(Write) + require.Equal(t, testData.expectedErrForWrite, err) + assert.ElementsMatch(t, testData.expectedSetForWrite, set.GetAddresses()) + + set, err = ring.GetAllHealthy(Reporting) + require.Equal(t, testData.expectedErrForReporting, err) + assert.ElementsMatch(t, testData.expectedSetForReporting, set.GetAddresses()) + }) + } +} + +func TestRing_GetReplicationSetForOperation(t *testing.T) { + const heartbeatTimeout = time.Minute + now := time.Now() + + tests := map[string]struct { + ringInstances map[string]IngesterDesc + ringReplicationFactor int + expectedErrForRead error + expectedSetForRead []string + expectedErrForWrite error + expectedSetForWrite []string + expectedErrForReporting error + expectedSetForReporting []string + }{ + "should return error on empty ring": { + ringInstances: nil, + ringReplicationFactor: 1, + expectedErrForRead: ErrEmptyRing, + expectedErrForWrite: ErrEmptyRing, + expectedErrForReporting: ErrEmptyRing, + }, + "should succeed on all healthy instances and RF=1": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-40 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + }, + ringReplicationFactor: 1, + expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"}, + expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"}, + expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"}, + }, + "should fail on 1 unhealthy instance and RF=1": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)}, + }, + ringReplicationFactor: 1, + expectedErrForRead: ErrTooManyFailedIngesters, + expectedErrForWrite: ErrTooManyFailedIngesters, + expectedErrForReporting: ErrTooManyFailedIngesters, + }, + "should succeed on 1 unhealthy instances and RF=3": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)}, + }, + ringReplicationFactor: 3, + expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + }, + "should fail on 2 unhealthy instances and RF=3": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)}, + }, + ringReplicationFactor: 3, + expectedErrForRead: ErrTooManyFailedIngesters, + expectedErrForWrite: ErrTooManyFailedIngesters, + expectedErrForReporting: ErrTooManyFailedIngesters, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: heartbeatTimeout, + ReplicationFactor: testData.ringReplicationFactor, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + set, err := ring.GetReplicationSetForOperation(Read) + require.Equal(t, testData.expectedErrForRead, err) + assert.ElementsMatch(t, testData.expectedSetForRead, set.GetAddresses()) + + set, err = ring.GetReplicationSetForOperation(Write) + require.Equal(t, testData.expectedErrForWrite, err) + assert.ElementsMatch(t, testData.expectedSetForWrite, set.GetAddresses()) + + set, err = ring.GetReplicationSetForOperation(Reporting) + require.Equal(t, testData.expectedErrForReporting, err) + assert.ElementsMatch(t, testData.expectedSetForReporting, set.GetAddresses()) + }) + } +} + func TestRing_ShuffleShard(t *testing.T) { tests := map[string]struct { ringInstances map[string]IngesterDesc @@ -363,7 +538,7 @@ func TestRing_ShuffleShard(t *testing.T) { var actualDistribution []int if shardRing.IngesterCount() > 0 { - all, err := shardRing.GetAll(Read) + all, err := shardRing.GetAllHealthy(Read) require.NoError(t, err) countByZone := map[string]int{} @@ -411,13 +586,13 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { for _, size := range shardSizes { r := ring.ShuffleShard(tenantID, size) - expected, err := r.GetAll(Read) + expected, err := r.GetAllHealthy(Read) require.NoError(t, err) // Assert that multiple invocations generate the same exact shard. for n := 0; n < numInvocations; n++ { r := ring.ShuffleShard(tenantID, size) - actual, err := r.GetAll(Read) + actual, err := r.GetAllHealthy(Read) require.NoError(t, err) assert.ElementsMatch(t, expected.Ingesters, actual.Ingesters) } @@ -479,7 +654,7 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { for i := 1; i <= numTenants; i++ { tenantID := fmt.Sprintf("%d", i) r := ring.ShuffleShard(tenantID, shardSize) - set, err := r.GetAll(Read) + set, err := r.GetAllHealthy(Read) require.NoError(t, err) instances := make([]string, 0, len(set.Ingesters)) @@ -574,7 +749,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { // Compute the initial shard for each tenant. initial := map[int]ReplicationSet{} for id := 0; id < numTenants; id++ { - set, err := ring.ShuffleShard(fmt.Sprintf("%d", id), s.shardSize).GetAll(Read) + set, err := ring.ShuffleShard(fmt.Sprintf("%d", id), s.shardSize).GetAllHealthy(Read) require.NoError(t, err) initial[id] = set } @@ -600,7 +775,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { // If the "consistency" property is guaranteed, we expect no more then 1 different instance // in the updated shard. for id := 0; id < numTenants; id++ { - updated, err := ring.ShuffleShard(fmt.Sprintf("%d", id), s.shardSize).GetAll(Read) + updated, err := ring.ShuffleShard(fmt.Sprintf("%d", id), s.shardSize).GetAllHealthy(Read) require.NoError(t, err) added, removed := compareReplicationSets(initial[id], updated) @@ -637,14 +812,14 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { firstShard := ring.ShuffleShard("tenant-id", 3) assert.Equal(t, 3, firstShard.IngesterCount()) - firstSet, err := firstShard.GetAll(Read) + firstSet, err := firstShard.GetAllHealthy(Read) require.NoError(t, err) // Increase shard size to 6. secondShard := ring.ShuffleShard("tenant-id", 6) assert.Equal(t, 6, secondShard.IngesterCount()) - secondSet, err := secondShard.GetAll(Read) + secondSet, err := secondShard.GetAllHealthy(Read) require.NoError(t, err) for _, firstInstance := range firstSet.Ingesters { @@ -655,7 +830,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { thirdShard := ring.ShuffleShard("tenant-id", 9) assert.Equal(t, 9, thirdShard.IngesterCount()) - thirdSet, err := thirdShard.GetAll(Read) + thirdSet, err := thirdShard.GetAllHealthy(Read) require.NoError(t, err) for _, secondInstance := range secondSet.Ingesters { @@ -666,7 +841,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { fourthShard := ring.ShuffleShard("tenant-id", 6) assert.Equal(t, 6, fourthShard.IngesterCount()) - fourthSet, err := fourthShard.GetAll(Read) + fourthSet, err := fourthShard.GetAllHealthy(Read) require.NoError(t, err) // We expect to have the same exact instances we had when the shard size was 6. @@ -678,7 +853,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { fifthShard := ring.ShuffleShard("tenant-id", 3) assert.Equal(t, 3, fifthShard.IngesterCount()) - fifthSet, err := fifthShard.GetAll(Read) + fifthSet, err := fifthShard.GetAllHealthy(Read) require.NoError(t, err) // We expect to have the same exact instances we had when the shard size was 3. @@ -713,14 +888,14 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { firstShard := ring.ShuffleShard("tenant-id", 2) assert.Equal(t, 2, firstShard.IngesterCount()) - firstSet, err := firstShard.GetAll(Read) + firstSet, err := firstShard.GetAllHealthy(Read) require.NoError(t, err) // Increase shard size to 4. secondShard := ring.ShuffleShard("tenant-id", 4) assert.Equal(t, 4, secondShard.IngesterCount()) - secondSet, err := secondShard.GetAll(Read) + secondSet, err := secondShard.GetAllHealthy(Read) require.NoError(t, err) for _, firstInstance := range firstSet.Ingesters { @@ -742,7 +917,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { thirdShard := ring.ShuffleShard("tenant-id", 6) assert.Equal(t, 6, thirdShard.IngesterCount()) - thirdSet, err := thirdShard.GetAll(Read) + thirdSet, err := thirdShard.GetAllHealthy(Read) require.NoError(t, err) for _, secondInstance := range secondSet.Ingesters { @@ -753,7 +928,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { fourthShard := ring.ShuffleShard("tenant-id", 9) assert.Equal(t, 9, fourthShard.IngesterCount()) - fourthSet, err := fourthShard.GetAll(Read) + fourthSet, err := fourthShard.GetAllHealthy(Read) require.NoError(t, err) for _, thirdInstance := range thirdSet.Ingesters { @@ -982,7 +1157,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { ring.ringTokensByZone = ringDesc.getTokensByZone() ring.ringZones = getZones(ringDesc.getTokensByZone()) case test: - rs, err := ring.ShuffleShardWithLookback(userID, event.shardSize, lookbackPeriod, time.Now()).GetAll(Read) + rs, err := ring.ShuffleShardWithLookback(userID, event.shardSize, lookbackPeriod, time.Now()).GetAllHealthy(Read) require.NoError(t, err) assert.ElementsMatch(t, event.expected, rs.GetAddresses()) } @@ -1035,7 +1210,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) // Add the initial shard to the history. - rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now()).GetAll(Read) + rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now()).GetReplicationSetForOperation(Read) require.NoError(t, err) history := map[time.Time]ReplicationSet{ @@ -1099,12 +1274,12 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { } // Add the current shard to the history. - rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now()).GetAll(Read) + rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now()).GetReplicationSetForOperation(Read) require.NoError(t, err) history[currTime] = rs // Ensure the shard with lookback includes all instances from previous states of the ring. - rsWithLookback, err := ring.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, currTime).GetAll(Read) + rsWithLookback, err := ring.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, currTime).GetReplicationSetForOperation(Read) require.NoError(t, err) for ringTime, ringState := range history { @@ -1273,7 +1448,7 @@ func TestRingUpdates(t *testing.T) { // sleep for 2 seconds) time.Sleep(2 * time.Second) - rs, err := ring.GetAll(Read) + rs, err := ring.GetAllHealthy(Read) require.NoError(t, err) now := time.Now() @@ -1362,7 +1537,7 @@ func TestShuffleShardWithCaching(t *testing.T) { // Wait until all instances in the ring are ACTIVE. test.Poll(t, 5*time.Second, numLifecyclers, func() interface{} { active := 0 - rs, _ := ring.GetAll(Read) + rs, _ := ring.GetReplicationSetForOperation(Read) for _, ing := range rs.Ingesters { if ing.State == ACTIVE { active++ @@ -1390,7 +1565,7 @@ func TestShuffleShardWithCaching(t *testing.T) { // Make sure subring has up-to-date timestamps. { - rs, err := subring.GetAll(Read) + rs, err := subring.GetReplicationSetForOperation(Read) require.NoError(t, err) now := time.Now() diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 8768832936e..438f8933145 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -406,7 +406,7 @@ func (r *Ruler) run(ctx context.Context) error { var ringLastState ring.ReplicationSet if r.cfg.EnableSharding { - ringLastState, _ = r.ring.GetAll(ring.Ruler) + ringLastState, _ = r.ring.GetAllHealthy(ring.Ruler) ringTicker := time.NewTicker(util.DurationWithJitter(r.cfg.RingCheckPeriod, 0.2)) defer ringTicker.Stop() ringTickerChan = ringTicker.C @@ -422,7 +422,7 @@ func (r *Ruler) run(ctx context.Context) error { case <-ringTickerChan: // We ignore the error because in case of error it will return an empty // replication set which we use to compare with the previous state. - currRingState, _ := r.ring.GetAll(ring.Ruler) + currRingState, _ := r.ring.GetAllHealthy(ring.Ruler) if ring.HasReplicationSetChanged(ringLastState, currRingState) { ringLastState = currRingState @@ -684,7 +684,7 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { } func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error) { - rulers, err := r.ring.GetAll(ring.Ruler) + rulers, err := r.ring.GetReplicationSetForOperation(ring.Ruler) if err != nil { return nil, err } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 3ae3458131d..8fa8cf01b79 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -270,7 +270,7 @@ func (g *StoreGateway) running(ctx context.Context) error { defer syncTicker.Stop() if g.gatewayCfg.ShardingEnabled { - ringLastState, _ = g.ring.GetAll(ring.BlocksSync) // nolint:errcheck + ringLastState, _ = g.ring.GetAllHealthy(ring.BlocksSync) // nolint:errcheck ringTicker := time.NewTicker(util.DurationWithJitter(g.gatewayCfg.ShardingRing.RingCheckPeriod, 0.2)) defer ringTicker.Stop() ringTickerChan = ringTicker.C @@ -283,7 +283,7 @@ func (g *StoreGateway) running(ctx context.Context) error { case <-ringTickerChan: // We ignore the error because in case of error it will return an empty // replication set which we use to compare with the previous state. - currRingState, _ := g.ring.GetAll(ring.BlocksSync) // nolint:errcheck + currRingState, _ := g.ring.GetAllHealthy(ring.BlocksSync) // nolint:errcheck if ring.HasReplicationSetChanged(ringLastState, currRingState) { ringLastState = currRingState