From d375b6319136935840cafff0db7c87b79f700cf2 Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Wed, 15 Jan 2025 08:53:44 -0800 Subject: [PATCH 1/5] Filter readOnly ingesters when sharding Signed-off-by: Daniel Deluiggi --- CHANGELOG.md | 1 + .../shuffle_sharding_grouper_test.go | 5 + pkg/distributor/distributor.go | 2 +- pkg/ring/model.go | 17 ++- pkg/ring/model_test.go | 20 +++ pkg/ring/ring.go | 127 ++++++++++++------ pkg/ring/ring_test.go | 117 +++++++++++++++- pkg/ring/util_test.go | 5 + 8 files changed, 243 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 771bb0554a5..37be94bcd56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ * [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409 * [BUGFIX] Query Frontend: Fix @ modifier not being applied correctly on sub queries. #6450 * [BUGFIX] Cortex Redis flags with multiple dots #6476 +* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 ## 1.18.1 2024-10-14 diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index c7aaa4a656a..bc018139ad1 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -794,6 +794,11 @@ func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ri return args.Get(0).(ring.ReadRing) } +func (r *RingMock) ShuffleShardWithOperation(identifier string, size int, op ring.Operation) ring.ReadRing { + args := r.Called(identifier, size, op) + return args.Get(0).(ring.ReadRing) +} + func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) { args := r.Called(instanceID) return args.Get(0).(ring.InstanceState), args.Error(1) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9aea6d8df0b..e2c91149ab0 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -787,7 +787,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Obtain a subring if required. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) + subRing = d.ingestersRing.ShuffleShardWithOperation(userID, limits.IngestionTenantShardSize, ring.WriteShard) } keys := append(seriesKeys, metadataKeys...) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 3f0e6944e2f..7ff7632516f 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -543,6 +543,7 @@ type CompareResult int const ( Equal CompareResult = iota // Both rings contain same exact instances. EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ). + EqualButReadOnly // Both rings contain the same instances but Write ring can change due to ReadOnly update Different // Rings have different set of instances, or their information don't match. ) @@ -566,6 +567,7 @@ func (d *Desc) RingCompare(o *Desc) CompareResult { } equalStatesAndTimestamps := true + equalReadOnly := true for name, ing := range d.Ingesters { oing, ok := o.Ingesters[name] @@ -600,14 +602,21 @@ func (d *Desc) RingCompare(o *Desc) CompareResult { } if ing.State != oing.State { - equalStatesAndTimestamps = false + if ing.State == READONLY || oing.State == READONLY { + equalReadOnly = false + } else { + equalStatesAndTimestamps = false + } } } - if equalStatesAndTimestamps { - return Equal + if !equalReadOnly { + return EqualButReadOnly + } + if !equalStatesAndTimestamps { + return EqualButStatesAndTimestamps } - return EqualButStatesAndTimestamps + return Equal } func GetOrCreateRingDesc(d interface{}) *Desc { diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 896aef56897..f34b6e566d2 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -395,6 +395,21 @@ func TestDesc_RingsCompare(t *testing.T) { r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}}, expected: Equal, }, + "same number of instances, from active to readOnly": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}}, + expected: EqualButReadOnly, + }, + "same number of instances, from readOnly to active": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}}, + expected: EqualButReadOnly, + }, + "same number of instances, prioritize readOnly than timestamp changes": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 123456}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY, Timestamp: 789012}}}, + expected: EqualButReadOnly, + }, "same single instance, different timestamp": { r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 123456}}}, r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 789012}}}, @@ -440,6 +455,11 @@ func TestDesc_RingsCompare(t *testing.T) { r2: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}}, expected: Different, }, + "same number of instances, prioritize diff than ReadOnly": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", State: ACTIVE}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", State: READONLY}}}, + expected: Different, + }, } for testName, testData := range tests { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 7377cbcccd4..0fbaf970322 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -70,6 +70,10 @@ type ReadRing interface { // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing + // ShuffleShardWithOperation returns a subring for the provided identifier (eg. a tenant ID) + // and size (number of instances) filtered for a given operation. + ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing + // ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm. // It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one // shard size at a time, at most 1 instance can be changed. @@ -112,6 +116,8 @@ var ( return s == READONLY }) + WriteShard = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool { return false }) + // Read operation that extends the replica set if an instance is not ACTIVE, PENDING, LEAVING, JOINING OR READONLY Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING, READONLY}, func(s InstanceState) bool { // To match Write with extended replica set we have to also increase the @@ -222,6 +228,7 @@ type subringCacheKey struct { shardSize int zoneStableSharding bool + operation Operation } // New creates a new Ring. Being a service, Ring needs to be started to do anything. @@ -333,12 +340,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) { } rc := prevRing.RingCompare(ringDesc) - if rc == Equal || rc == EqualButStatesAndTimestamps { + if rc == Equal || rc == EqualButStatesAndTimestamps || rc == EqualButReadOnly { // No need to update tokens or zones. Only states and timestamps // have changed. (If Equal, nothing has changed, but that doesn't happen // when watching the ring for updates). r.mtx.Lock() r.ringDesc = ringDesc + if rc == EqualButReadOnly && r.shuffledSubringCache != nil { + // Invalidate all cached subrings. + r.shuffledSubringCache = make(map[subringCacheKey]*Ring) + } r.updateRingMetrics(rc) r.mtx.Unlock() return @@ -732,11 +743,15 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // - Shuffling: probabilistically, for a large enough cluster each identifier gets a different // set of instances, with a reduced number of overlapping instances between two identifiers. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, false) + return r.shuffleShardWithCache(identifier, size, false, Reporting) +} + +func (r *Ring) ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing { + return r.shuffleShardWithCache(identifier, size, false, op) } func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, true) + return r.shuffleShardWithCache(identifier, size, true, Reporting) } // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances @@ -752,26 +767,26 @@ func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPer return r } - return r.shuffleShard(identifier, size, lookbackPeriod, now, false) + return r.shuffleShard(identifier, size, lookbackPeriod, now, false, Reporting) } -func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing { +func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool, op Operation) ReadRing { // Nothing to do if the shard size is not smaller than the actual ring. - if size <= 0 || r.InstancesCount() <= size { + if size <= 0 || (op == Reporting && r.InstancesCount() <= size) { return r } - if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil { + if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding, op); cached != nil { return cached } - result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding) + result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding, op) - r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result) + r.setCachedShuffledSubring(identifier, size, zoneStableSharding, op, result) return result } -func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring { +func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool, op Operation) *Ring { lookbackUntil := now.Add(-lookbackPeriod).Unix() r.mtx.RLock() @@ -783,14 +798,16 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur zonesWithExtraInstance int ) + ro := r.getRingForOperation(op) + if r.cfg.ZoneAwarenessEnabled { if zoneStableSharding { - numInstancesPerZone = size / len(r.ringZones) - zonesWithExtraInstance = size % len(r.ringZones) + numInstancesPerZone = size / len(ro.ringZones) + zonesWithExtraInstance = size % len(ro.ringZones) } else { - numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones)) + numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(ro.ringZones)) } - actualZones = r.ringZones + actualZones = ro.ringZones } else { numInstancesPerZone = size actualZones = []string{""} @@ -802,12 +819,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur for _, zone := range actualZones { var tokens []uint32 - if r.cfg.ZoneAwarenessEnabled { - tokens = r.ringTokensByZone[zone] + if ro.cfg.ZoneAwarenessEnabled { + tokens = ro.ringTokensByZone[zone] } else { // When zone-awareness is disabled, we just iterate over 1 single fake zone // and use all tokens in the ring. - tokens = r.ringTokens + tokens = ro.ringTokens } // Initialise the random generator used to select instances in the ring. @@ -835,7 +852,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // Wrap p around in the ring. p %= len(tokens) - info, ok := r.ringInstanceByToken[tokens[p]] + info, ok := ro.ringInstanceByToken[tokens[p]] if !ok { // This should never happen unless a bug in the ring code. panic(ErrInconsistentTokensInfo) @@ -847,7 +864,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } instanceID := info.InstanceID - instance := r.ringDesc.Ingesters[instanceID] + instance := ro.ringDesc.Ingesters[instanceID] shard[instanceID] = instance // If the lookback is enabled and this instance has been registered within the lookback period @@ -869,27 +886,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } - // Build a read-only ring for the shard. - shardDesc := &Desc{Ingesters: shard} - shardTokensByZone := shardDesc.getTokensByZone() - - return &Ring{ - cfg: r.cfg, - strategy: r.strategy, - ringDesc: shardDesc, - ringTokens: shardDesc.GetTokens(), - ringTokensByZone: shardTokensByZone, - ringZones: getZones(shardTokensByZone), - KVClient: r.KVClient, - - // We reference the original map as is in order to avoid copying. It's safe to do - // because this map is immutable by design and it's a superset of the actual instances - // with the subring. - ringInstanceByToken: r.ringInstanceByToken, - - // For caching to work, remember these values. - lastTopologyChange: r.lastTopologyChange, - } + return r.copyWithNewDesc(shard) } // GetInstanceState returns the current state of an instance or an error if the @@ -926,7 +923,7 @@ func (r *Ring) HasInstance(instanceID string) bool { return ok } -func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring { +func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation) *Ring { if r.cfg.SubringCacheDisabled { return nil } @@ -935,7 +932,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS defer r.mtx.RUnlock() // if shuffledSubringCache map is nil, reading it returns default value (nil pointer). - cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] + cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}] if cached == nil { return nil } @@ -954,7 +951,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS return cached } -func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) { +func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation, subring *Ring) { if subring == nil || r.cfg.SubringCacheDisabled { return } @@ -966,7 +963,49 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS // (which can happen between releasing the read lock and getting read-write lock). // Note that shuffledSubringCache can be only nil when set by test. if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) { - r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring + r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}] = subring + } +} + +// getRingForOperation Returns a new ring filtered for operation. +// The ring read lock must be already taken when calling this function. +func (r *Ring) getRingForOperation(op Operation) *Ring { + //Avoid filtering if we are receiving default operation or empty ring + if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 || op == Reporting { + return r + } + + instanceDescs := make(map[string]InstanceDesc) + for id, instance := range r.ringDesc.Ingesters { + if op.IsInstanceInStateHealthy(instance.State) { + instanceDescs[id] = instance + } + } + + return r.copyWithNewDesc(instanceDescs) +} + +// copyWithNewDesc Return a new ring with updated data for different InstanceDesc +func (r *Ring) copyWithNewDesc(desc map[string]InstanceDesc) *Ring { + shardDesc := &Desc{Ingesters: desc} + shardTokensByZone := shardDesc.getTokensByZone() + + return &Ring{ + cfg: r.cfg, + strategy: r.strategy, + ringDesc: shardDesc, + ringTokens: shardDesc.GetTokens(), + ringTokensByZone: shardTokensByZone, + ringZones: getZones(shardTokensByZone), + KVClient: r.KVClient, + + // We reference the original map as is in order to avoid copying. It's safe to do + // because this map is immutable by design and it's a superset of the actual instances + // with the subring. + ringInstanceByToken: r.ringInstanceByToken, + + // For caching to work, remember these values. + lastTopologyChange: r.lastTopologyChange, } } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index a5937e2e8ec..b9a622dc33c 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -2523,6 +2523,119 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { } } +func TestRing_ShuffleShardWithOperation(t *testing.T) { + g := NewRandomTokenGenerator() + + const ( + userID = "user-1" + ) + + tests := map[string]struct { + ringInstances map[string]InstanceDesc + ringReplicationFactor int + shardSize int + expectedSize int + op Operation + expectedToBeFilter []string + }{ + "single zone, shard size = 1, default scenario": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 1, + expectedSize: 1, + op: WriteNoExtend, + expectedToBeFilter: []string{}, + }, + "single zone, shard size = 1, filter ReadOnly": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 1, + expectedSize: 1, + op: WriteShard, + expectedToBeFilter: []string{"127.0.0.2"}, + }, + "single zone, shard size = 4, do not filter other states": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: JOINING, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-a", 128, true)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: PENDING, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 4, + expectedSize: 4, + op: WriteShard, + expectedToBeFilter: []string{}, + }, + "single zone, shard size = 4, filter readOnly even if shard size is not achieved": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: JOINING, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-a", 128, true)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 4, + expectedSize: 3, + op: WriteShard, + expectedToBeFilter: []string{"127.0.0.4"}, + }, + "rf = 3, shard size = 4, filter readOnly from different zones": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-b", 128, true)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-c", 128, true)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-b", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-5", "zone-b", 128, true)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-6", "zone-c", 128, true)}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-7", "zone-a", 128, true)}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-b", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-8", "zone-b", 128, true)}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-c", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-9", "zone-c", 128, true)}, + }, + ringReplicationFactor: 3, + shardSize: 6, + expectedSize: 6, + op: WriteShard, + expectedToBeFilter: []string{"127.0.0.8", "127.0.0.9", "127.0.0.10"}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{ + ReplicationFactor: testData.ringReplicationFactor, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(), + KVClient: &MockClient{}, + } + + shardRing := ring.ShuffleShardWithOperation(userID, testData.shardSize, testData.op) + assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) + for _, expectedInstance := range testData.expectedToBeFilter { + assert.False(t, shardRing.HasInstance(expectedInstance)) + } + }) + } +} + func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { // The goal of this test is NOT to ensure that the minimum required number of instances // are returned at any given time, BUT at least all required instances are returned. @@ -2572,7 +2685,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) // Add the initial shard to the history. - rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read) + rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding, Reporting).GetReplicationSetForOperation(Read) require.NoError(t, err) history := map[time.Time]ReplicationSet{ @@ -2638,7 +2751,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { } // Add the current shard to the history. - rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read) + rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding, Reporting).GetReplicationSetForOperation(Read) require.NoError(t, err) history[currTime] = rs diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index 563cc0aa7f6..a777f1e14f5 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -63,6 +63,11 @@ func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing { return args.Get(0).(ReadRing) } +func (r *RingMock) ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing { + args := r.Called(identifier, size, op) + return args.Get(0).(ReadRing) +} + func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { args := r.Called(identifier, size) return args.Get(0).(ReadRing) From cb86a389472458306800d07c405633efac9e32f4 Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Wed, 22 Jan 2025 14:53:23 -0800 Subject: [PATCH 2/5] Extend shard on READONLY Signed-off-by: Daniel Deluiggi --- .../shuffle_sharding_grouper_test.go | 5 -- pkg/distributor/distributor.go | 2 +- pkg/ingester/ingester.go | 2 +- pkg/ring/lifecycler.go | 6 ++ pkg/ring/lifecycler_test.go | 80 +++++++++++++++++++ pkg/ring/model.go | 2 +- pkg/ring/ring.go | 61 +++++++------- pkg/ring/ring_test.go | 66 +++++++-------- pkg/ring/util_test.go | 5 -- 9 files changed, 145 insertions(+), 84 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index bc018139ad1..1e2a331c21e 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -761,11 +761,6 @@ func (r *RingMock) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) return args.Get(0).(ring.ReplicationSet), args.Error(1) } -func (r *RingMock) GetInstanceDescsForOperation(op ring.Operation) (map[string]ring.InstanceDesc, error) { - args := r.Called(op) - return args.Get(0).(map[string]ring.InstanceDesc), args.Error(1) -} - func (r *RingMock) GetAllInstanceDescs(op ring.Operation) ([]ring.InstanceDesc, []ring.InstanceDesc, error) { args := r.Called(op) return args.Get(0).([]ring.InstanceDesc), make([]ring.InstanceDesc, 0), args.Error(1) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e2c91149ab0..9aea6d8df0b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -787,7 +787,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Obtain a subring if required. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - subRing = d.ingestersRing.ShuffleShardWithOperation(userID, limits.IngestionTenantShardSize, ring.WriteShard) + subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) } keys := append(seriesKeys, metadataKeys...) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4c41a90a991..c15f82ce354 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -3122,7 +3122,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -// ModeHandler Change mode of ingester. It will also update set unregisterOnShutdown to true if READONLY mode +// ModeHandler Change mode of ingester. func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) { err := r.ParseForm() if err != nil { diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 7e8b033e83f..c34d3464019 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -1005,6 +1005,12 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error level.Info(i.logger).Log("msg", "changing instance state from", "old_state", currState, "new_state", state, "ring", i.RingName) i.setState(state) + + //The instances is rejoining the ring. It should reset its registered time. + if currState == READONLY && state == ACTIVE { + registeredAt := time.Now() + i.setRegisteredAt(registeredAt) + } return i.updateConsul(ctx) } diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 035cfc8f1b8..e0756765379 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -827,6 +827,86 @@ func TestTokenFileOnDisk(t *testing.T) { } } +func TestRegisteredAtOnBackToActive(t *testing.T) { + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) + defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + tokenDir := t.TempDir() + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig.NumTokens = 512 + lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" + + // Start first ingester. + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) + + // Check this ingester joined, is active. + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters["ing1"].State == ACTIVE + }) + + //Get original registeredTime + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + desc, ok := d.(*Desc) + require.True(t, ok) + originalRegisterTime := desc.Ingesters["ing1"].RegisteredTimestamp + + // Change state from ACTIVE to READONLY + err = l1.ChangeState(context.Background(), READONLY) + require.NoError(t, err) + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + desc.Ingesters["ing1"].State == READONLY + }) + + //Guarantee 1s diff for RegisteredTimestamp + time.Sleep(1 * time.Second) + + // Change state from READONLY to ACTIVE + err = l1.ChangeState(context.Background(), ACTIVE) + require.NoError(t, err) + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + desc.Ingesters["ing1"].State == ACTIVE + }) + + d, err = r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok = d.(*Desc) + require.True(t, ok) + ing := desc.Ingesters["ing1"] + require.True(t, ing.RegisteredTimestamp > originalRegisterTime) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1)) +} + func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 7ff7632516f..a465bf0fa91 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -547,7 +547,7 @@ const ( Different // Rings have different set of instances, or their information don't match. ) -// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different. +// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps, EqualButReadOnly or Different. func (d *Desc) RingCompare(o *Desc) CompareResult { if d == nil { if o == nil || len(o.Ingesters) == 0 { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 0fbaf970322..8f05021c329 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -70,10 +70,6 @@ type ReadRing interface { // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing - // ShuffleShardWithOperation returns a subring for the provided identifier (eg. a tenant ID) - // and size (number of instances) filtered for a given operation. - ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing - // ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm. // It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one // shard size at a time, at most 1 instance can be changed. @@ -116,8 +112,6 @@ var ( return s == READONLY }) - WriteShard = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool { return false }) - // Read operation that extends the replica set if an instance is not ACTIVE, PENDING, LEAVING, JOINING OR READONLY Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING, READONLY}, func(s InstanceState) bool { // To match Write with extended replica set we have to also increase the @@ -228,7 +222,6 @@ type subringCacheKey struct { shardSize int zoneStableSharding bool - operation Operation } // New creates a new Ring. Being a service, Ring needs to be started to do anything. @@ -743,15 +736,15 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // - Shuffling: probabilistically, for a large enough cluster each identifier gets a different // set of instances, with a reduced number of overlapping instances between two identifiers. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, false, Reporting) + return r.shuffleShardWithCache(identifier, size, false) } -func (r *Ring) ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing { - return r.shuffleShardWithCache(identifier, size, false, op) +func (r *Ring) ShuffleShardWithOperation(identifier string, size int) ReadRing { + return r.shuffleShardWithCache(identifier, size, false) } func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, true, Reporting) + return r.shuffleShardWithCache(identifier, size, true) } // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances @@ -767,26 +760,26 @@ func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPer return r } - return r.shuffleShard(identifier, size, lookbackPeriod, now, false, Reporting) + return r.shuffleShard(identifier, size, lookbackPeriod, now, false) } -func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool, op Operation) ReadRing { +func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing { // Nothing to do if the shard size is not smaller than the actual ring. - if size <= 0 || (op == Reporting && r.InstancesCount() <= size) { + if size <= 0 || r.InstancesCount() <= size { return r } - if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding, op); cached != nil { + if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil { return cached } - result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding, op) + result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding) - r.setCachedShuffledSubring(identifier, size, zoneStableSharding, op, result) + r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result) return result } -func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool, op Operation) *Ring { +func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring { lookbackUntil := now.Add(-lookbackPeriod).Unix() r.mtx.RLock() @@ -798,16 +791,14 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur zonesWithExtraInstance int ) - ro := r.getRingForOperation(op) - if r.cfg.ZoneAwarenessEnabled { if zoneStableSharding { - numInstancesPerZone = size / len(ro.ringZones) - zonesWithExtraInstance = size % len(ro.ringZones) + numInstancesPerZone = size / len(r.ringZones) + zonesWithExtraInstance = size % len(r.ringZones) } else { - numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(ro.ringZones)) + numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones)) } - actualZones = ro.ringZones + actualZones = r.ringZones } else { numInstancesPerZone = size actualZones = []string{""} @@ -819,12 +810,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur for _, zone := range actualZones { var tokens []uint32 - if ro.cfg.ZoneAwarenessEnabled { - tokens = ro.ringTokensByZone[zone] + if r.cfg.ZoneAwarenessEnabled { + tokens = r.ringTokensByZone[zone] } else { // When zone-awareness is disabled, we just iterate over 1 single fake zone // and use all tokens in the ring. - tokens = ro.ringTokens + tokens = r.ringTokens } // Initialise the random generator used to select instances in the ring. @@ -852,7 +843,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // Wrap p around in the ring. p %= len(tokens) - info, ok := ro.ringInstanceByToken[tokens[p]] + info, ok := r.ringInstanceByToken[tokens[p]] if !ok { // This should never happen unless a bug in the ring code. panic(ErrInconsistentTokensInfo) @@ -864,12 +855,14 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } instanceID := info.InstanceID - instance := ro.ringDesc.Ingesters[instanceID] + instance := r.ringDesc.Ingesters[instanceID] shard[instanceID] = instance // If the lookback is enabled and this instance has been registered within the lookback period // then we should include it in the subring but continuing selecting instances. - if lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil { + // If an instance is in READONLY we should always extend. The write path will filter it out when GetRing. + // The read path should extend to get new ingester used on write + if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY { continue } @@ -923,7 +916,7 @@ func (r *Ring) HasInstance(instanceID string) bool { return ok } -func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation) *Ring { +func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring { if r.cfg.SubringCacheDisabled { return nil } @@ -932,7 +925,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS defer r.mtx.RUnlock() // if shuffledSubringCache map is nil, reading it returns default value (nil pointer). - cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}] + cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] if cached == nil { return nil } @@ -951,7 +944,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS return cached } -func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation, subring *Ring) { +func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) { if subring == nil || r.cfg.SubringCacheDisabled { return } @@ -963,7 +956,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS // (which can happen between releasing the read lock and getting read-write lock). // Note that shuffledSubringCache can be only nil when set by test. if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) { - r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}] = subring + r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring } } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index b9a622dc33c..15ff51ba993 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -2523,7 +2523,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { } } -func TestRing_ShuffleShardWithOperation(t *testing.T) { +func TestRing_ShuffleShardWithReadOnlyIngesters(t *testing.T) { g := NewRandomTokenGenerator() const ( @@ -2536,7 +2536,7 @@ func TestRing_ShuffleShardWithOperation(t *testing.T) { shardSize int expectedSize int op Operation - expectedToBeFilter []string + expectedToBePresent []string }{ "single zone, shard size = 1, default scenario": { ringInstances: map[string]InstanceDesc{ @@ -2546,19 +2546,15 @@ func TestRing_ShuffleShardWithOperation(t *testing.T) { ringReplicationFactor: 1, shardSize: 1, expectedSize: 1, - op: WriteNoExtend, - expectedToBeFilter: []string{}, }, - "single zone, shard size = 1, filter ReadOnly": { + "single zone, shard size = 1, not filter ReadOnly": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, }, ringReplicationFactor: 1, - shardSize: 1, - expectedSize: 1, - op: WriteShard, - expectedToBeFilter: []string{"127.0.0.2"}, + shardSize: 2, + expectedSize: 2, }, "single zone, shard size = 4, do not filter other states": { ringInstances: map[string]InstanceDesc{ @@ -2570,39 +2566,35 @@ func TestRing_ShuffleShardWithOperation(t *testing.T) { ringReplicationFactor: 1, shardSize: 4, expectedSize: 4, - op: WriteShard, - expectedToBeFilter: []string{}, }, - "single zone, shard size = 4, filter readOnly even if shard size is not achieved": { + "single zone, shard size = 4, extend on readOnly": { ringInstances: map[string]InstanceDesc{ - "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, - "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: JOINING, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, - "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-a", 128, true)}, - "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)}, + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{6}}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3, 5}}, }, ringReplicationFactor: 1, - shardSize: 4, + shardSize: 2, expectedSize: 3, - op: WriteShard, - expectedToBeFilter: []string{"127.0.0.4"}, + expectedToBePresent: []string{"instance-4"}, }, - "rf = 3, shard size = 4, filter readOnly from different zones": { + "rf = 3, shard size = 4, extend readOnly from different zones": { ringInstances: map[string]InstanceDesc{ - "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, - "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-b", 128, true)}, - "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-c", 128, true)}, - "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)}, - "instance-5": {Addr: "127.0.0.5", Zone: "zone-b", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-5", "zone-b", 128, true)}, - "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-6", "zone-c", 128, true)}, - "instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-7", "zone-a", 128, true)}, - "instance-8": {Addr: "127.0.0.8", Zone: "zone-b", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-8", "zone-b", 128, true)}, - "instance-9": {Addr: "127.0.0.9", Zone: "zone-c", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-9", "zone-c", 128, true)}, + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{12}}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{22}}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{14}}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{24}}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3}}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-b", State: READONLY, Tokens: []uint32{11, 13}}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-c", State: READONLY, Tokens: []uint32{21, 23}}, }, ringReplicationFactor: 3, shardSize: 6, - expectedSize: 6, - op: WriteShard, - expectedToBeFilter: []string{"127.0.0.8", "127.0.0.9", "127.0.0.10"}, + expectedSize: 9, + expectedToBePresent: []string{"instance-7", "instance-8", "instance-9"}, }, } @@ -2627,10 +2619,10 @@ func TestRing_ShuffleShardWithOperation(t *testing.T) { KVClient: &MockClient{}, } - shardRing := ring.ShuffleShardWithOperation(userID, testData.shardSize, testData.op) + shardRing := ring.ShuffleShardWithOperation(userID, testData.shardSize) assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) - for _, expectedInstance := range testData.expectedToBeFilter { - assert.False(t, shardRing.HasInstance(expectedInstance)) + for _, expectedInstance := range testData.expectedToBePresent { + assert.True(t, shardRing.HasInstance(expectedInstance)) } }) } @@ -2685,7 +2677,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) // Add the initial shard to the history. - rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding, Reporting).GetReplicationSetForOperation(Read) + rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read) require.NoError(t, err) history := map[time.Time]ReplicationSet{ @@ -2751,7 +2743,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { } // Add the current shard to the history. - rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding, Reporting).GetReplicationSetForOperation(Read) + rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read) require.NoError(t, err) history[currTime] = rs diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index a777f1e14f5..563cc0aa7f6 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -63,11 +63,6 @@ func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing { return args.Get(0).(ReadRing) } -func (r *RingMock) ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing { - args := r.Called(identifier, size, op) - return args.Get(0).(ReadRing) -} - func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { args := r.Called(identifier, size) return args.Get(0).(ReadRing) From 2a217bd844ac0412c106ae8abbd7500915074d10 Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Wed, 22 Jan 2025 14:59:58 -0800 Subject: [PATCH 3/5] Remove old code Signed-off-by: Daniel Deluiggi --- .../shuffle_sharding_grouper_test.go | 10 +-- pkg/ring/ring.go | 68 ++++++------------- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index 1e2a331c21e..c7aaa4a656a 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -761,6 +761,11 @@ func (r *RingMock) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) return args.Get(0).(ring.ReplicationSet), args.Error(1) } +func (r *RingMock) GetInstanceDescsForOperation(op ring.Operation) (map[string]ring.InstanceDesc, error) { + args := r.Called(op) + return args.Get(0).(map[string]ring.InstanceDesc), args.Error(1) +} + func (r *RingMock) GetAllInstanceDescs(op ring.Operation) ([]ring.InstanceDesc, []ring.InstanceDesc, error) { args := r.Called(op) return args.Get(0).([]ring.InstanceDesc), make([]ring.InstanceDesc, 0), args.Error(1) @@ -789,11 +794,6 @@ func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ri return args.Get(0).(ring.ReadRing) } -func (r *RingMock) ShuffleShardWithOperation(identifier string, size int, op ring.Operation) ring.ReadRing { - args := r.Called(identifier, size, op) - return args.Get(0).(ring.ReadRing) -} - func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) { args := r.Called(instanceID) return args.Get(0).(ring.InstanceState), args.Error(1) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 8f05021c329..f4557ec5436 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -739,10 +739,6 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { return r.shuffleShardWithCache(identifier, size, false) } -func (r *Ring) ShuffleShardWithOperation(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, false) -} - func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { return r.shuffleShardWithCache(identifier, size, true) } @@ -879,7 +875,27 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } - return r.copyWithNewDesc(shard) + // Build a read-only ring for the shard. + shardDesc := &Desc{Ingesters: shard} + shardTokensByZone := shardDesc.getTokensByZone() + + return &Ring{ + cfg: r.cfg, + strategy: r.strategy, + ringDesc: shardDesc, + ringTokens: shardDesc.GetTokens(), + ringTokensByZone: shardTokensByZone, + ringZones: getZones(shardTokensByZone), + KVClient: r.KVClient, + + // We reference the original map as is in order to avoid copying. It's safe to do + // because this map is immutable by design and it's a superset of the actual instances + // with the subring. + ringInstanceByToken: r.ringInstanceByToken, + + // For caching to work, remember these values. + lastTopologyChange: r.lastTopologyChange, + } } // GetInstanceState returns the current state of an instance or an error if the @@ -960,48 +976,6 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS } } -// getRingForOperation Returns a new ring filtered for operation. -// The ring read lock must be already taken when calling this function. -func (r *Ring) getRingForOperation(op Operation) *Ring { - //Avoid filtering if we are receiving default operation or empty ring - if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 || op == Reporting { - return r - } - - instanceDescs := make(map[string]InstanceDesc) - for id, instance := range r.ringDesc.Ingesters { - if op.IsInstanceInStateHealthy(instance.State) { - instanceDescs[id] = instance - } - } - - return r.copyWithNewDesc(instanceDescs) -} - -// copyWithNewDesc Return a new ring with updated data for different InstanceDesc -func (r *Ring) copyWithNewDesc(desc map[string]InstanceDesc) *Ring { - shardDesc := &Desc{Ingesters: desc} - shardTokensByZone := shardDesc.getTokensByZone() - - return &Ring{ - cfg: r.cfg, - strategy: r.strategy, - ringDesc: shardDesc, - ringTokens: shardDesc.GetTokens(), - ringTokensByZone: shardTokensByZone, - ringZones: getZones(shardTokensByZone), - KVClient: r.KVClient, - - // We reference the original map as is in order to avoid copying. It's safe to do - // because this map is immutable by design and it's a superset of the actual instances - // with the subring. - ringInstanceByToken: r.ringInstanceByToken, - - // For caching to work, remember these values. - lastTopologyChange: r.lastTopologyChange, - } -} - func (r *Ring) CleanupShuffleShardCache(identifier string) { if r.cfg.SubringCacheDisabled { return From a88c66148ca1386198e253a095a0bd834f12d67e Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Wed, 22 Jan 2025 15:13:27 -0800 Subject: [PATCH 4/5] Fix test Signed-off-by: Daniel Deluiggi --- pkg/ring/ring_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 15ff51ba993..ff51e9a4e6c 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -2619,7 +2619,7 @@ func TestRing_ShuffleShardWithReadOnlyIngesters(t *testing.T) { KVClient: &MockClient{}, } - shardRing := ring.ShuffleShardWithOperation(userID, testData.shardSize) + shardRing := ring.ShuffleShard(userID, testData.shardSize) assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) for _, expectedInstance := range testData.expectedToBePresent { assert.True(t, shardRing.HasInstance(expectedInstance)) From 8ad97a894b1e36ca294b77a31e29e5aa8b2a9d5a Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Thu, 23 Jan 2025 10:57:11 -0800 Subject: [PATCH 5/5] update changelog Signed-off-by: Daniel Deluiggi --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37be94bcd56..7744fdb6387 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526 +* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 ## 1.19.0 in progress @@ -76,7 +77,6 @@ * [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409 * [BUGFIX] Query Frontend: Fix @ modifier not being applied correctly on sub queries. #6450 * [BUGFIX] Cortex Redis flags with multiple dots #6476 -* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 ## 1.18.1 2024-10-14