diff --git a/CHANGELOG.md b/CHANGELOG.md index 250b8eae454..1c6c72c15ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ * [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613 * [ENHANCEMENT] Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants. #3627 * [ENHANCEMENT] Querier: Implement result caching for tenant query federation. #3640 +* [ENHANCEMENT] Disabled in-memory shuffle-sharding subring cache in the store-gateway, ruler and compactor. This should reduce the memory utilisation in these services when shuffle-sharding is enabled, without introducing a significantly increase CPU utilisation. #3601 +* [ENHANCEMENT] Shuffle sharding: optimised subring generation used by shuffle sharding. #3601 * [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598 * [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603 * [BUGFIX] Ingester: do not close idle TSDBs while blocks shipping is in progress. #3630 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a0dfd65d142..d639d01eda5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -624,7 +624,7 @@ func (c *Compactor) ownUser(userID string) (bool, error) { userHash := hasher.Sum32() // Check whether this compactor instance owns the user. - rs, err := c.ring.Get(userHash, ring.Compactor, []ring.IngesterDesc{}) + rs, err := c.ring.Get(userHash, ring.Compactor, nil, nil, nil) if err != nil { return false, err } diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 7bfc930da21..a39d76ef883 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -79,6 +79,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { // Configure lifecycler lc.RingConfig = rc + lc.RingConfig.SubringCacheDisabled = true lc.ListenPort = cfg.ListenPort lc.Addr = cfg.InstanceAddr lc.Port = cfg.InstancePort diff --git a/pkg/compactor/compactor_ring_test.go b/pkg/compactor/compactor_ring_test.go index d25c996f900..0519367c04a 100644 --- a/pkg/compactor/compactor_ring_test.go +++ b/pkg/compactor/compactor_ring_test.go @@ -20,6 +20,7 @@ func TestRingConfig_DefaultConfigToLifecyclerConfig(t *testing.T) { // intentionally overridden expected.ListenPort = cfg.ListenPort expected.RingConfig.ReplicationFactor = 1 + expected.RingConfig.SubringCacheDisabled = true expected.NumTokens = 512 expected.MinReadyDuration = 0 expected.FinalSleep = 0 @@ -45,6 +46,7 @@ func TestRingConfig_CustomConfigToLifecyclerConfig(t *testing.T) { // ring config expected.HeartbeatPeriod = cfg.HeartbeatPeriod expected.RingConfig.HeartbeatTimeout = cfg.HeartbeatTimeout + expected.RingConfig.SubringCacheDisabled = true expected.ID = cfg.InstanceID expected.InfNames = cfg.InstanceInterfaceNames expected.Port = cfg.InstancePort diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 9628fc64323..8e78ae56a1e 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -97,7 +97,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) if ok && metricNameMatcher.Type == labels.MatchEqual { - return d.ingestersRing.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil) + return d.ingestersRing.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil, nil, nil) } } diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index 0730d393fc8..47630ff324b 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -98,12 +98,11 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid // Find the replication set of each block we need to query. for _, blockID := range blockIDs { - // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). // Do not reuse the same buffer across multiple Get() calls because we do retain the // returned replication set. - buf := make([]ring.IngesterDesc, 0, userRing.ReplicationFactor()+2) + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, buf) + set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, bufDescs, bufHosts, bufZones) if err != nil { return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", blockID.String()) } diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 89a24656aac..1050fb9d5ec 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -46,10 +46,13 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges itemTrackers := make([]itemTracker, len(keys)) ingesters := make(map[string]ingester, r.IngesterCount()) - const maxExpectedReplicationSet = 5 // Typical replication factor 3, plus one for inactive plus one for luck. - var descs [maxExpectedReplicationSet]IngesterDesc + var ( + bufDescs [GetBufferSize]IngesterDesc + bufHosts [GetBufferSize]string + bufZones [GetBufferSize]string + ) for i, key := range keys { - replicationSet, err := r.Get(key, Write, descs[:0]) + replicationSet, err := r.Get(key, Write, bufDescs[:0], bufHosts[:0], bufZones[:0]) if err != nil { return err } diff --git a/pkg/ring/http.go b/pkg/ring/http.go index 4ab9bd9f961..cd39800c8b0 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -131,13 +131,14 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { } sort.Strings(ingesterIDs) + now := time.Now() ingesters := []interface{}{} - _, owned := countTokens(r.ringDesc, r.ringTokens) + _, owned := r.countTokens() for _, id := range ingesterIDs { ing := r.ringDesc.Ingesters[id] heartbeatTimestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(&ing, Reporting) { + if !r.IsHealthy(&ing, Reporting, now) { state = unhealthy } @@ -178,7 +179,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { ShowTokens bool `json:"-"` }{ Ingesters: ingesters, - Now: time.Now(), + Now: now, ShowTokens: tokensParam == "true", }, pageTemplate, req) } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 0b1351010e8..c5d6f22b879 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -753,11 +753,13 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { zones := map[string]struct{}{} if ringDesc != nil { + now := time.Now() + for _, ingester := range ringDesc.Ingesters { zones[ingester.Zone] = struct{}{} // Count the number of healthy instances for Write operation. - if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout) { + if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { healthyInstancesCount++ } } diff --git a/pkg/ring/model.go b/pkg/ring/model.go index e4a2ea45b9c..a981918fa5d 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -1,6 +1,7 @@ package ring import ( + "container/heap" "fmt" "sort" "time" @@ -11,13 +12,6 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" ) -// ByToken is a sortable list of TokenDescs -type ByToken []TokenDesc - -func (ts ByToken) Len() int { return len(ts) } -func (ts ByToken) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } -func (ts ByToken) Less(i, j int) bool { return ts[i].Token < ts[j].Token } - // ByAddr is a sortable list of IngesterDesc. type ByAddr []IngesterDesc @@ -121,16 +115,12 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error { return nil } -// TokensFor partitions the tokens into those for the given ID, and those for others. -func (d *Desc) TokensFor(id string) (tokens, other Tokens) { - takenTokens, myTokens := Tokens{}, Tokens{} - for _, token := range d.getTokens() { - takenTokens = append(takenTokens, token.Token) - if token.Ingester == id { - myTokens = append(myTokens, token.Token) - } - } - return myTokens, takenTokens +// TokensFor return all ring tokens and tokens for the input provided ID. +// Returned tokens are guaranteed to be sorted. +func (d *Desc) TokensFor(id string) (myTokens, allTokens Tokens) { + allTokens = d.GetTokens() + myTokens = d.Ingesters[id].Tokens + return } // GetRegisteredAt returns the timestamp when the instance has been registered to the ring @@ -144,7 +134,7 @@ func (i *IngesterDesc) GetRegisteredAt() time.Time { } // IsHealthy checks whether the ingester appears to be alive and heartbeating -func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool { +func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool { healthy := false switch op { @@ -170,7 +160,7 @@ func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) b healthy = i.State == ACTIVE } - return healthy && time.Since(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout + return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000 } // Merge merges other ring into this one. Returns sub-ring that represents the change, @@ -419,46 +409,43 @@ func (d *Desc) RemoveTombstones(limit time.Time) { } } -type TokenDesc struct { - Token uint32 - Ingester string - Zone string -} +func (d *Desc) getTokensInfo() map[uint32]instanceInfo { + out := map[uint32]instanceInfo{} -// getTokens returns sorted list of tokens with ingester IDs, owned by each ingester in the ring. -func (d *Desc) getTokens() []TokenDesc { - numTokens := 0 - for _, ing := range d.Ingesters { - numTokens += len(ing.Tokens) - } - tokens := make([]TokenDesc, 0, numTokens) - for key, ing := range d.Ingesters { - for _, token := range ing.Tokens { - tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()}) + for instanceID, instance := range d.Ingesters { + info := instanceInfo{ + InstanceID: instanceID, + Zone: instance.Zone, + } + + for _, token := range instance.Tokens { + out[token] = info } } - sort.Sort(ByToken(tokens)) - return tokens + return out } -// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone -// are guaranteed to be sorted. -func (d *Desc) getTokensByZone() map[string][]TokenDesc { - zones := map[string][]TokenDesc{} - - for key, ing := range d.Ingesters { - for _, token := range ing.Tokens { - zones[ing.Zone] = append(zones[ing.Zone], TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()}) - } +// GetTokens returns sorted list of tokens owned by all instances within the ring. +func (d *Desc) GetTokens() []uint32 { + instances := make([][]uint32, 0, len(d.Ingesters)) + for _, instance := range d.Ingesters { + instances = append(instances, instance.Tokens) } - // Ensure tokens are sorted within each zone. - for zone := range zones { - sort.Sort(ByToken(zones[zone])) + return MergeTokens(instances) +} + +// getTokensByZone returns instances tokens grouped by zone. Tokens within each zone +// are guaranteed to be sorted. +func (d *Desc) getTokensByZone() map[string][]uint32 { + zones := map[string][][]uint32{} + for _, instance := range d.Ingesters { + zones[instance.Zone] = append(zones[instance.Zone], instance.Tokens) } - return zones + // Merge tokens per zone. + return MergeTokensByZone(zones) } type CompareResult int @@ -539,3 +526,79 @@ func GetOrCreateRingDesc(d interface{}) *Desc { } return d.(*Desc) } + +// TokensHeap is an heap data structure used to merge multiple lists +// of sorted tokens into a single one. +type TokensHeap [][]uint32 + +func (h TokensHeap) Len() int { + return len(h) +} + +func (h TokensHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h TokensHeap) Less(i, j int) bool { + return h[i][0] < h[j][0] +} + +func (h *TokensHeap) Push(x interface{}) { + *h = append(*h, x.([]uint32)) +} + +func (h *TokensHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// MergeTokens takes in input multiple lists of tokens and returns a single list +// containing all tokens merged and sorted. Each input single list is required +// to have tokens already sorted. +func MergeTokens(instances [][]uint32) []uint32 { + numTokens := 0 + + // Build the heap. + h := make(TokensHeap, 0, len(instances)) + for _, tokens := range instances { + if len(tokens) == 0 { + continue + } + + // We can safely append the input slice because elements inside are never shuffled. + h = append(h, tokens) + numTokens += len(tokens) + } + heap.Init(&h) + + out := make([]uint32, 0, numTokens) + + for h.Len() > 0 { + // The minimum element in the tree is the root, at index 0. + lowest := h[0] + out = append(out, lowest[0]) + + if len(lowest) > 1 { + // Remove the first token from the lowest because we popped it + // and then fix the heap to keep it sorted. + h[0] = h[0][1:] + heap.Fix(&h, 0) + } else { + heap.Remove(&h, 0) + } + } + + return out +} + +// MergeTokensByZone is like MergeTokens but does it for each input zone. +func MergeTokensByZone(zones map[string][][]uint32) map[string][]uint32 { + out := make(map[string][]uint32, len(zones)) + for zone, tokens := range zones { + out[zone] = MergeTokens(tokens) + } + return out +} diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 13aaa5efefa..ed7206f061c 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -51,13 +51,13 @@ func TestIngesterDesc_IsHealthy_ForIngesterOperations(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - actual := testData.ingester.IsHealthy(Write, testData.timeout) + actual := testData.ingester.IsHealthy(Write, testData.timeout, time.Now()) assert.Equal(t, testData.writeExpected, actual) - actual = testData.ingester.IsHealthy(Read, testData.timeout) + actual = testData.ingester.IsHealthy(Read, testData.timeout, time.Now()) assert.Equal(t, testData.readExpected, actual) - actual = testData.ingester.IsHealthy(Reporting, testData.timeout) + actual = testData.ingester.IsHealthy(Reporting, testData.timeout, time.Now()) assert.Equal(t, testData.reportExpected, actual) }) } @@ -108,10 +108,10 @@ func TestIngesterDesc_IsHealthy_ForStoreGatewayOperations(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - actual := testData.instance.IsHealthy(BlocksSync, testData.timeout) + actual := testData.instance.IsHealthy(BlocksSync, testData.timeout, time.Now()) assert.Equal(t, testData.syncExpected, actual) - actual = testData.instance.IsHealthy(BlocksRead, testData.timeout) + actual = testData.instance.IsHealthy(BlocksRead, testData.timeout, time.Now()) assert.Equal(t, testData.queryExpected, actual) }) } @@ -220,11 +220,11 @@ func TestDesc_Ready(t *testing.T) { func TestDesc_getTokensByZone(t *testing.T) { tests := map[string]struct { desc *Desc - expected map[string][]TokenDesc + expected map[string][]uint32 }{ "empty ring": { desc: &Desc{Ingesters: map[string]IngesterDesc{}}, - expected: map[string][]TokenDesc{}, + expected: map[string][]uint32{}, }, "single zone": { desc: &Desc{Ingesters: map[string]IngesterDesc{ @@ -232,15 +232,8 @@ func TestDesc_getTokensByZone(t *testing.T) { "instance-2": {Addr: "127.0.0.1", Tokens: []uint32{2, 4}, Zone: ""}, "instance-3": {Addr: "127.0.0.1", Tokens: []uint32{3, 6}, Zone: ""}, }}, - expected: map[string][]TokenDesc{ - "": { - {Token: 1, Ingester: "instance-1", Zone: ""}, - {Token: 2, Ingester: "instance-2", Zone: ""}, - {Token: 3, Ingester: "instance-3", Zone: ""}, - {Token: 4, Ingester: "instance-2", Zone: ""}, - {Token: 5, Ingester: "instance-1", Zone: ""}, - {Token: 6, Ingester: "instance-3", Zone: ""}, - }, + expected: map[string][]uint32{ + "": {1, 2, 3, 4, 5, 6}, }, }, "multiple zones": { @@ -249,17 +242,9 @@ func TestDesc_getTokensByZone(t *testing.T) { "instance-2": {Addr: "127.0.0.1", Tokens: []uint32{2, 4}, Zone: "zone-1"}, "instance-3": {Addr: "127.0.0.1", Tokens: []uint32{3, 6}, Zone: "zone-2"}, }}, - expected: map[string][]TokenDesc{ - "zone-1": { - {Token: 1, Ingester: "instance-1", Zone: "zone-1"}, - {Token: 2, Ingester: "instance-2", Zone: "zone-1"}, - {Token: 4, Ingester: "instance-2", Zone: "zone-1"}, - {Token: 5, Ingester: "instance-1", Zone: "zone-1"}, - }, - "zone-2": { - {Token: 3, Ingester: "instance-3", Zone: "zone-2"}, - {Token: 6, Ingester: "instance-3", Zone: "zone-2"}, - }, + expected: map[string][]uint32{ + "zone-1": {1, 2, 4, 5}, + "zone-2": {3, 6}, }, }, } @@ -271,6 +256,46 @@ func TestDesc_getTokensByZone(t *testing.T) { } } +func TestDesc_TokensFor(t *testing.T) { + tests := map[string]struct { + desc *Desc + expectedMine Tokens + expectedAll Tokens + }{ + "empty ring": { + desc: &Desc{Ingesters: map[string]IngesterDesc{}}, + expectedMine: Tokens(nil), + expectedAll: Tokens{}, + }, + "single zone": { + desc: &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1, 5}, Zone: ""}, + "instance-2": {Addr: "127.0.0.1", Tokens: []uint32{2, 4}, Zone: ""}, + "instance-3": {Addr: "127.0.0.1", Tokens: []uint32{3, 6}, Zone: ""}, + }}, + expectedMine: Tokens{1, 5}, + expectedAll: Tokens{1, 2, 3, 4, 5, 6}, + }, + "multiple zones": { + desc: &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1, 5}, Zone: "zone-1"}, + "instance-2": {Addr: "127.0.0.1", Tokens: []uint32{2, 4}, Zone: "zone-1"}, + "instance-3": {Addr: "127.0.0.1", Tokens: []uint32{3, 6}, Zone: "zone-2"}, + }}, + expectedMine: Tokens{1, 5}, + expectedAll: Tokens{1, 2, 3, 4, 5, 6}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + actualMine, actualAll := testData.desc.TokensFor("instance-1") + assert.Equal(t, testData.expectedMine, actualMine) + assert.Equal(t, testData.expectedAll, actualAll) + }) + } +} + func TestDesc_RingsCompare(t *testing.T) { tests := map[string]struct { r1, r2 *Desc @@ -350,3 +375,90 @@ func TestDesc_RingsCompare(t *testing.T) { }) } } + +func TestMergeTokens(t *testing.T) { + tests := map[string]struct { + input [][]uint32 + expected []uint32 + }{ + "empty input": { + input: nil, + expected: []uint32{}, + }, + "single instance in input": { + input: [][]uint32{ + {1, 3, 4, 8}, + }, + expected: []uint32{1, 3, 4, 8}, + }, + "multiple instances in input": { + input: [][]uint32{ + {1, 3, 4, 8}, + {0, 2, 6, 9}, + {5, 7, 10, 11}, + }, + expected: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + "some instances have no tokens": { + input: [][]uint32{ + {1, 3, 4, 8}, + {}, + {0, 2, 6, 9}, + {}, + {5, 7, 10, 11}, + }, + expected: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, MergeTokens(testData.input)) + }) + } +} + +func TestMergeTokensByZone(t *testing.T) { + tests := map[string]struct { + input map[string][][]uint32 + expected map[string][]uint32 + }{ + "empty input": { + input: nil, + expected: map[string][]uint32{}, + }, + "single zone": { + input: map[string][][]uint32{ + "zone-1": { + {1, 3, 4, 8}, + {2, 5, 6, 7}, + }, + }, + expected: map[string][]uint32{ + "zone-1": {1, 2, 3, 4, 5, 6, 7, 8}, + }, + }, + "multiple zones": { + input: map[string][][]uint32{ + "zone-1": { + {1, 3, 4, 8}, + {2, 5, 6, 7}, + }, + "zone-2": { + {3, 5}, + {2, 4}, + }, + }, + expected: map[string][]uint32{ + "zone-1": {1, 2, 3, 4, 5, 6, 7, 8}, + "zone-2": {2, 3, 4, 5}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, MergeTokensByZone(testData.input)) + }) + } +} diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 3490853dce4..5b3e1d41443 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -42,12 +42,13 @@ func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati } minSuccess := (replicationFactor / 2) + 1 + now := time.Now() // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters // will cause the whole write to fail. for i := 0; i < len(ingesters); { - if ingesters[i].IsHealthy(op, heartbeatTimeout) { + if ingesters[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { ingesters = append(ingesters[:i], ingesters[i+1:]...) @@ -91,8 +92,8 @@ func (s *defaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDes } // IsHealthy checks whether an ingester appears to be alive and heartbeating -func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool { - return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout) +func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool { + return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } // ReplicationFactor of the ring. diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 2cdd18948a5..7b58ec419c5 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -34,6 +34,10 @@ const ( // CompactorRingKey is the key under which we store the compactors ring in the KVStore. CompactorRingKey = "compactor" + + // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on + // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. + GetBufferSize = 5 ) // ReadRing represents the read interface to the ring. @@ -41,9 +45,9 @@ type ReadRing interface { prometheus.Collector // Get returns n (or more) ingesters which form the replicas for the given key. - // buf is a slice to be overwritten for the return value - // to avoid memory allocation; can be nil. - Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) + // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value + // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). + Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number @@ -104,6 +108,10 @@ var ( // ErrTooManyFailedIngesters is the error returned when there are too many failed ingesters for a // specific operation. ErrTooManyFailedIngesters = errors.New("too many failed ingesters") + + // ErrInconsistentTokensInfo is the error returned if, due to an internal bug, the mapping between + // a token and its own instance is missing or unknown. + ErrInconsistentTokensInfo = errors.New("inconsistent ring tokens information") ) // Config for a Ring @@ -113,6 +121,10 @@ type Config struct { ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` ExtendWrites bool `yaml:"extend_writes"` + + // Whether the shuffle-sharding subring cache is disabled. This option is set + // internally and never exposed to the user. + SubringCacheDisabled bool `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -130,6 +142,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.ExtendWrites, prefix+"distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") } +type instanceInfo struct { + InstanceID string + Zone string +} + // Ring holds the information about the members of the consistent hash ring. type Ring struct { services.Service @@ -141,8 +158,13 @@ type Ring struct { mtx sync.RWMutex ringDesc *Desc - ringTokens []TokenDesc - ringTokensByZone map[string][]TokenDesc + ringTokens []uint32 + ringTokensByZone map[string][]uint32 + + // Maps a token with the information of the instance holding it. This map is immutable and + // cannot be chanced in place because it's shared "as is" between subrings (the only way to + // change it is to create a new one and replace it). + ringInstanceByToken map[uint32]instanceInfo // When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp). lastTopologyChange time.Time @@ -256,8 +278,9 @@ func (r *Ring) loop(ctx context.Context) error { } now := time.Now() - ringTokens := ringDesc.getTokens() + ringTokens := ringDesc.GetTokens() ringTokensByZone := ringDesc.getTokensByZone() + ringInstanceByToken := ringDesc.getTokensInfo() ringZones := getZones(ringTokensByZone) r.mtx.Lock() @@ -265,6 +288,7 @@ func (r *Ring) loop(ctx context.Context) error { r.ringDesc = ringDesc r.ringTokens = ringTokens r.ringTokensByZone = ringTokensByZone + r.ringInstanceByToken = ringInstanceByToken r.ringZones = ringZones r.lastTopologyChange = now if r.shuffledSubringCache != nil { @@ -277,7 +301,7 @@ func (r *Ring) loop(ctx context.Context) error { } // Get returns n (or more) ingesters which form the replicas for the given key. -func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) { +func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { @@ -285,34 +309,43 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet } var ( - n = r.cfg.ReplicationFactor - ingesters = buf[:0] - distinctHosts = map[string]struct{}{} - distinctZones = map[string]struct{}{} - start = searchToken(r.ringTokens, key) - iterations = 0 + n = r.cfg.ReplicationFactor + ingesters = bufDescs[:0] + start = searchToken(r.ringTokens, key) + iterations = 0 + + // We use a slice instead of a map because it's faster to search within a + // slice than lookup a map for a very low number of items. + distinctHosts = bufHosts[:0] + distinctZones = bufZones[:0] ) for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ { iterations++ // Wrap i around in the ring. i %= len(r.ringTokens) + token := r.ringTokens[i] + + info, ok := r.ringInstanceByToken[token] + if !ok { + // This should never happen unless a bug in the ring code. + return ReplicationSet{}, ErrInconsistentTokensInfo + } // We want n *distinct* ingesters && distinct zones. - token := r.ringTokens[i] - if _, ok := distinctHosts[token.Ingester]; ok { + if util.StringsContain(distinctHosts, info.InstanceID) { continue } // Ignore if the ingesters don't have a zone set. - if r.cfg.ZoneAwarenessEnabled && token.Zone != "" { - if _, ok := distinctZones[token.Zone]; ok { + if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { + if util.StringsContain(distinctZones, info.Zone) { continue } - distinctZones[token.Zone] = struct{}{} + distinctZones = append(distinctZones, info.Zone) } - distinctHosts[token.Ingester] = struct{}{} - ingester := r.ringDesc.Ingesters[token.Ingester] + distinctHosts = append(distinctHosts, info.InstanceID) + ingester := r.ringDesc.Ingesters[info.InstanceID] // Check whether the replica set should be extended given we're including // this instance. @@ -343,9 +376,10 @@ func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { return ReplicationSet{}, ErrEmptyRing } + now := time.Now() ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters { - if r.IsHealthy(&ingester, op) { + if r.IsHealthy(&ingester, op, now) { ingesters = append(ingesters, ingester) } } @@ -368,8 +402,10 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro // Build the initial replication set, excluding unhealthy instances. healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) zoneFailures := make(map[string]struct{}) + now := time.Now() + for _, ingester := range r.ringDesc.Ingesters { - if r.IsHealthy(&ingester, op) { + if r.IsHealthy(&ingester, op, now) { healthyInstances = append(healthyInstances, ingester) } else { zoneFailures[ingester.Zone] = struct{}{} @@ -445,21 +481,28 @@ func (r *Ring) Describe(ch chan<- *prometheus.Desc) { ch <- r.numTokensDesc } -func countTokens(ringDesc *Desc, tokens []TokenDesc) (map[string]uint32, map[string]uint32) { +// countTokens returns the number of tokens and tokens within the range for each instance. +// The ring read lock must be already taken when calling this function. +func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { owned := map[string]uint32{} numTokens := map[string]uint32{} - for i, token := range tokens { + for i, token := range r.ringTokens { var diff uint32 - if i+1 == len(tokens) { - diff = (math.MaxUint32 - token.Token) + tokens[0].Token + + // Compute how many tokens are within the range. + if i+1 == len(r.ringTokens) { + diff = (math.MaxUint32 - token) + r.ringTokens[0] } else { - diff = tokens[i+1].Token - token.Token + diff = r.ringTokens[i+1] - token } - numTokens[token.Ingester] = numTokens[token.Ingester] + 1 - owned[token.Ingester] = owned[token.Ingester] + diff + + info := r.ringInstanceByToken[token] + numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 + owned[info.InstanceID] = owned[info.InstanceID] + diff } - for id := range ringDesc.Ingesters { + // Set to 0 the number of owned tokens by instances which don't have tokens yet. + for id := range r.ringDesc.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 numTokens[id] = 0 @@ -474,7 +517,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { r.mtx.RLock() defer r.mtx.RUnlock() - numTokens, ownedRange := countTokens(r.ringDesc, r.ringTokens) + numTokens, ownedRange := r.countTokens() for id, totalOwned := range ownedRange { ch <- prometheus.MustNewConstMetric( r.memberOwnershipDesc, @@ -501,7 +544,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { for _, ingester := range r.ringDesc.Ingesters { s := ingester.State.String() - if !r.IsHealthy(&ingester, Reporting) { + if !r.IsHealthy(&ingester, Reporting, time.Now()) { s = unhealthy } numByState[s]++ @@ -606,7 +649,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // We need to iterate zones always in the same order to guarantee stability. for _, zone := range actualZones { - var tokens []TokenDesc + var tokens []uint32 if r.cfg.ZoneAwarenessEnabled { tokens = r.ringTokensByZone[zone] @@ -636,21 +679,27 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // Wrap p around in the ring. p %= len(tokens) + info, ok := r.ringInstanceByToken[tokens[p]] + if !ok { + // This should never happen unless a bug in the ring code. + panic(ErrInconsistentTokensInfo) + } + // Ensure we select an unique instance. - if _, ok := shard[tokens[p].Ingester]; ok { + if _, ok := shard[info.InstanceID]; ok { continue } - instance := r.ringDesc.Ingesters[tokens[p].Ingester] + instanceID := info.InstanceID + instance := r.ringDesc.Ingesters[instanceID] + shard[instanceID] = instance // If the lookback is enabled and this instance has been registered within the lookback period // then we should include it in the subring but continuing selecting instances. if lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil { - shard[tokens[p].Ingester] = instance continue } - shard[tokens[p].Ingester] = instance found = true break } @@ -672,10 +721,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur cfg: r.cfg, strategy: r.strategy, ringDesc: shardDesc, - ringTokens: shardDesc.getTokens(), + ringTokens: shardDesc.GetTokens(), ringTokensByZone: shardTokensByZone, ringZones: getZones(shardTokensByZone), + // We reference the original map as is in order to avoid copying. It's safe to do + // because this map is immutable by design and it's a superset of the actual instances + // with the subring. + ringInstanceByToken: r.ringInstanceByToken, + // For caching to work, remember these values. lastTopologyChange: r.lastTopologyChange, } @@ -707,6 +761,10 @@ func (r *Ring) HasInstance(instanceID string) bool { } func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring { + if r.cfg.SubringCacheDisabled { + return nil + } + r.mtx.RLock() defer r.mtx.RUnlock() @@ -731,7 +789,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring { } func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ring) { - if subring == nil { + if subring == nil || r.cfg.SubringCacheDisabled { return } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 33f5e42a31d..eafcaf54ce8 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -195,11 +195,12 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { ReplicationFactor: testData.replicationFactor, ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, }, - ringDesc: r, - ringTokens: r.getTokens(), - ringTokensByZone: r.getTokensByZone(), - ringZones: getZones(r.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: r, + ringTokens: r.GetTokens(), + ringTokensByZone: r.getTokensByZone(), + ringInstanceByToken: r.getTokensInfo(), + ringZones: getZones(r.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } ingesters := make([]IngesterDesc, 0, len(r.GetIngesters())) @@ -207,13 +208,15 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { ingesters = append(ingesters, v) } + _, bufHosts, bufZones := MakeBuffersForGet() + // Use the GenerateTokens to get an array of random uint32 values. testValues := GenerateTokens(testCount, nil) var set ReplicationSet var err error for i := 0; i < testCount; i++ { - set, err = ring.Get(testValues[i], Write, ingesters) + set, err = ring.Get(testValues[i], Write, ingesters, bufHosts, bufZones) if testData.expectedErr != "" { require.EqualError(t, err, testData.expectedErr) } else { @@ -285,12 +288,13 @@ func TestRing_GetAllHealthy(t *testing.T) { } ring := Ring{ - cfg: Config{HeartbeatTimeout: heartbeatTimeout}, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + cfg: Config{HeartbeatTimeout: heartbeatTimeout}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } set, err := ring.GetAllHealthy(Read) @@ -396,11 +400,12 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) { HeartbeatTimeout: heartbeatTimeout, ReplicationFactor: testData.ringReplicationFactor, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } set, err := ring.GetReplicationSetForOperation(Read) @@ -713,11 +718,12 @@ func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing. ZoneAwarenessEnabled: true, ReplicationFactor: testData.replicationFactor, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Check the replication set has the correct settings @@ -848,11 +854,12 @@ func TestRing_ShuffleShard(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) @@ -893,17 +900,18 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { ) // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } for i := 1; i <= numTenants; i++ { @@ -966,11 +974,12 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Compute the shard for each tenant. @@ -1058,17 +1067,18 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(s.numInstances, s.numZones)} + ringDesc := &Desc{Ingesters: generateRingInstances(s.numInstances, s.numZones, 128)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Compute the initial shard for each tenant. @@ -1082,7 +1092,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { // Update the ring. switch s.ringChange { case add: - newID, newDesc := generateRingInstance(s.numInstances+1, 0) + newID, newDesc := generateRingInstance(s.numInstances+1, 0, 128) ringDesc.Ingesters[newID] = newDesc case remove: // Remove the first one. @@ -1092,8 +1102,9 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { } } - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) // Compute the update shard for each tenant and compare it with the initial one. @@ -1115,7 +1126,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { // Create 30 instances in 3 zones. ringInstances := map[string]IngesterDesc{} for i := 0; i < 30; i++ { - name, desc := generateRingInstance(i, i%3) + name, desc := generateRingInstance(i, i%3, 128) ringInstances[name] = desc } @@ -1126,11 +1137,12 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Get the replication set with shard size = 3. @@ -1191,7 +1203,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { // Create 20 instances in 2 zones. ringInstances := map[string]IngesterDesc{} for i := 0; i < 20; i++ { - name, desc := generateRingInstance(i, i%2) + name, desc := generateRingInstance(i, i%2, 128) ringInstances[name] = desc } @@ -1202,11 +1214,12 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Get the replication set with shard size = 2. @@ -1229,13 +1242,14 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { // Scale up cluster, adding 10 instances in 1 new zone. for i := 20; i < 30; i++ { - name, desc := generateRingInstance(i, 2) + name, desc := generateRingInstance(i, 2, 128) ringInstances[name] = desc } ring.ringDesc.Ingesters = ringInstances - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) // Increase shard size to 6. @@ -1459,11 +1473,12 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Replay the events on the timeline. @@ -1472,14 +1487,16 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { case add: ringDesc.Ingesters[event.instanceID] = event.instanceDesc - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) case remove: delete(ringDesc.Ingesters, event.instanceID) - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) case test: rs, err := ring.ShuffleShardWithLookback(userID, event.shardSize, lookbackPeriod, time.Now()).GetAllHealthy(Read) @@ -1514,18 +1531,19 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { t.Log("random generator seed:", seed) // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, ReplicationFactor: 3, }, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // The simulation starts with the minimum shard size. Random events can later increase it. @@ -1560,8 +1578,9 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { ringDesc.Ingesters[instanceID] = generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(128, nil), currTime) - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) case r < 90: // Scale down instances by 1. To make tests reproducible we get the instance IDs, sort them @@ -1577,8 +1596,9 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { idToRemove := ingesterIDs[idxToRemove] delete(ringDesc.Ingesters, idToRemove) - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) // Remove the terminated instance from the history. @@ -1634,7 +1654,7 @@ func BenchmarkRing_ShuffleShard(b *testing.B) { for _, numZones := range []int{1, 3} { for _, shardSize := range []int{3, 10, 30} { b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) { - benchmarkShuffleSharding(b, numInstances, numZones, shardSize, false) + benchmarkShuffleSharding(b, numInstances, numZones, 128, shardSize, false) }) } } @@ -1646,37 +1666,109 @@ func BenchmarkRing_ShuffleShardCached(b *testing.B) { for _, numZones := range []int{1, 3} { for _, shardSize := range []int{3, 10, 30} { b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) { - benchmarkShuffleSharding(b, numInstances, numZones, shardSize, true) + benchmarkShuffleSharding(b, numInstances, numZones, 128, shardSize, true) }) } } } } -func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, shardSize int, cache bool) { +func BenchmarkRing_ShuffleShard_512Tokens(b *testing.B) { + const ( + numInstances = 30 + numZones = 3 + numTokens = 512 + shardSize = 9 + cacheEnabled = false + ) + + benchmarkShuffleSharding(b, numInstances, numZones, numTokens, shardSize, cacheEnabled) +} + +func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, shardSize int, cache bool) { // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, numTokens)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true}, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), - lastTopologyChange: time.Now(), + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: !cache}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(true), + lastTopologyChange: time.Now(), } - if cache { - ring.shuffledSubringCache = map[subringCacheKey]*Ring{} + b.ResetTimer() + + for n := 0; n < b.N; n++ { + ring.ShuffleShard("tenant-1", shardSize) } +} + +func BenchmarkRing_Get(b *testing.B) { + const ( + numInstances = 100 + numZones = 3 + replicationFactor = 3 + ) + + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, numTokens)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: replicationFactor}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(true), + lastTopologyChange: time.Now(), + } + + buf, bufHosts, bufZones := MakeBuffersForGet() + r := rand.New(rand.NewSource(time.Now().UnixNano())) b.ResetTimer() for n := 0; n < b.N; n++ { - ring.ShuffleShard("tenant-1", shardSize) + set, err := ring.Get(r.Uint32(), Write, buf, bufHosts, bufZones) + if err != nil || len(set.Ingesters) != replicationFactor { + b.Fatal() + } } } +func TestRing_Get_NoMemoryAllocations(t *testing.T) { + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(3, 3, 128)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: 3}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(true), + lastTopologyChange: time.Now(), + } + + buf, bufHosts, bufZones := MakeBuffersForGet() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + numAllocs := testing.AllocsPerRun(10, func() { + set, err := ring.Get(r.Uint32(), Write, buf, bufHosts, bufZones) + if err != nil || len(set.Ingesters) != 3 { + t.Fail() + } + }) + + assert.Equal(t, float64(0), numAllocs) +} + // generateTokensLinear returns tokens with a linear distribution. func generateTokensLinear(instanceID, numInstances, numTokens int) []uint32 { tokens := make([]uint32, 0, numTokens) @@ -1690,22 +1782,22 @@ func generateTokensLinear(instanceID, numInstances, numTokens int) []uint32 { return tokens } -func generateRingInstances(numInstances, numZones int) map[string]IngesterDesc { +func generateRingInstances(numInstances, numZones, numTokens int) map[string]IngesterDesc { instances := make(map[string]IngesterDesc, numInstances) for i := 1; i <= numInstances; i++ { - id, desc := generateRingInstance(i, i%numZones) + id, desc := generateRingInstance(i, i%numZones, numTokens) instances[id] = desc } return instances } -func generateRingInstance(id, zone int) (string, IngesterDesc) { +func generateRingInstance(id, zone, numTokens int) (string, IngesterDesc) { instanceID := fmt.Sprintf("instance-%d", id) zoneID := fmt.Sprintf("zone-%d", zone) - return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(128, nil), time.Now()) + return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(numTokens, nil), time.Now()) } func generateRingInstanceWithInfo(addr, zone string, tokens []uint32, registeredAt time.Time) IngesterDesc { diff --git a/pkg/ring/util.go b/pkg/ring/util.go index 6f28988eeda..921900c2dc1 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -10,7 +10,7 @@ import ( ) // GenerateTokens make numTokens unique random tokens, none of which clash -// with takenTokens. +// with takenTokens. Generated tokens are sorted. func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { if numTokens <= 0 { return []uint32{} @@ -23,7 +23,7 @@ func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { used[v] = true } - tokens := []uint32{} + tokens := make([]uint32, 0, numTokens) for i := 0; i < numTokens; { candidate := r.Uint32() if used[candidate] { @@ -34,6 +34,11 @@ func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { i++ } + // Ensure returned tokens are sorted. + sort.Slice(tokens, func(i, j int) bool { + return tokens[i] < tokens[j] + }) + return tokens } @@ -116,9 +121,17 @@ func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, } } +// MakeBuffersForGet returns buffers to use with Ring.Get(). +func MakeBuffersForGet() (bufDescs []IngesterDesc, bufHosts, bufZones []string) { + bufDescs = make([]IngesterDesc, 0, GetBufferSize) + bufHosts = make([]string, 0, GetBufferSize) + bufZones = make([]string, 0, GetBufferSize) + return +} + // getZones return the list zones from the provided tokens. The returned list // is guaranteed to be sorted. -func getZones(tokens map[string][]TokenDesc) []string { +func getZones(tokens map[string][]uint32) []string { var zones []string for zone := range tokens { @@ -130,9 +143,9 @@ func getZones(tokens map[string][]TokenDesc) []string { } // searchToken returns the offset of the tokens entry holding the range for the provided key. -func searchToken(tokens []TokenDesc, key uint32) int { +func searchToken(tokens []uint32, key uint32) int { i := sort.Search(len(tokens), func(x int) bool { - return tokens[x].Token > key + return tokens[x] > key }) if i >= len(tokens) { i = 0 diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index a9d844b564e..717235ebb1a 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -59,12 +59,13 @@ func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t }} ring := &Ring{ - cfg: Config{HeartbeatTimeout: time.Minute}, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } startTime := time.Now() @@ -93,12 +94,13 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing. }} ring := &Ring{ - cfg: Config{HeartbeatTimeout: time.Minute}, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Add 1 new instance after some time. @@ -111,8 +113,9 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing. instanceID := fmt.Sprintf("instance-%d", len(ringDesc.Ingesters)+1) ringDesc.Ingesters[instanceID] = IngesterDesc{Addr: instanceID, State: ACTIVE, Timestamp: time.Now().Unix()} ring.ringDesc = ringDesc - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) }() @@ -142,12 +145,13 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { }} ring := &Ring{ - cfg: Config{HeartbeatTimeout: time.Minute}, - ringDesc: ringDesc, - ringTokens: ringDesc.getTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(true), + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(true), } // Keep changing the ring. @@ -164,8 +168,9 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { instanceID := fmt.Sprintf("instance-%d", len(ringDesc.Ingesters)+1) ringDesc.Ingesters[instanceID] = IngesterDesc{Addr: instanceID, State: ACTIVE, Timestamp: time.Now().Unix()} ring.ringDesc = ringDesc - ring.ringTokens = ringDesc.getTokens() + ring.ringTokens = ringDesc.GetTokens() ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) ring.mtx.Unlock() diff --git a/pkg/ruler/lifecycle.go b/pkg/ruler/lifecycle.go index 454e1290e23..47a5be6a07d 100644 --- a/pkg/ruler/lifecycle.go +++ b/pkg/ruler/lifecycle.go @@ -13,7 +13,7 @@ func (r *Ruler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.De tokens = instanceDesc.GetTokens() } - _, takenTokens := ringDesc.TokensFor(instanceID) + takenTokens := ringDesc.GetTokens() newTokens := ring.GenerateTokens(r.cfg.Ring.NumTokens-len(tokens), takenTokens) // Tokens sorting will be enforced by the parent caller. diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 6d8c6ab5bfc..f28a48afa63 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -368,7 +368,7 @@ func tokenForGroup(g *store.RuleGroupDesc) uint32 { func instanceOwnsRuleGroup(r ring.ReadRing, g *rules.RuleGroupDesc, instanceAddr string) (bool, error) { hash := tokenForGroup(g) - rlrs, err := r.Get(hash, ring.Ruler, []ring.IngesterDesc{}) + rlrs, err := r.Get(hash, ring.Ruler, nil, nil, nil) if err != nil { return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } diff --git a/pkg/ruler/ruler_replication_strategy.go b/pkg/ruler/ruler_replication_strategy.go index 0d16572fef1..78616c9839d 100644 --- a/pkg/ruler/ruler_replication_strategy.go +++ b/pkg/ruler/ruler_replication_strategy.go @@ -12,9 +12,11 @@ type rulerReplicationStrategy struct { } func (r rulerReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []ring.IngesterDesc, maxFailures int, err error) { + now := time.Now() + // Filter out unhealthy instances. for i := 0; i < len(instances); { - if instances[i].IsHealthy(op, heartbeatTimeout) { + if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { instances = append(instances[:i], instances[i+1:]...) diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index 6ee8b8e32b9..33881bd8bbe 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -91,6 +91,7 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { rc.KVStore = cfg.KVStore rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.SubringCacheDisabled = true // Each rule group is loaded to *exactly* one ruler. rc.ReplicationFactor = 1 diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index f94e08d43f9..6680d06cec3 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -339,7 +339,7 @@ func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc tokens = instanceDesc.GetTokens() } - _, takenTokens := ringDesc.TokensFor(instanceID) + takenTokens := ringDesc.GetTokens() newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) // Tokens sorting will be enforced by the parent caller. diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 2fcb9b2faae..7d1d24e6df2 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -92,6 +92,7 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { rc.HeartbeatTimeout = cfg.HeartbeatTimeout rc.ReplicationFactor = cfg.ReplicationFactor rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled + rc.SubringCacheDisabled = true return rc } diff --git a/pkg/storegateway/replication_strategy.go b/pkg/storegateway/replication_strategy.go index db978bdcd4f..3420292fea9 100644 --- a/pkg/storegateway/replication_strategy.go +++ b/pkg/storegateway/replication_strategy.go @@ -10,9 +10,11 @@ import ( type BlocksReplicationStrategy struct{} func (s *BlocksReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) ([]ring.IngesterDesc, int, error) { + now := time.Now() + // Filter out unhealthy instances. for i := 0; i < len(instances); { - if instances[i].IsHealthy(op, heartbeatTimeout) { + if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { instances = append(instances[:i], instances[i+1:]...) diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 9945735db4d..6ad628b2cad 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -122,12 +122,11 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, } func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, logger log.Logger) { - // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). - buf := make([]ring.IngesterDesc, 0, r.ReplicationFactor()+2) + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() for blockID := range metas { key := cortex_tsdb.HashBlockID(blockID) - set, err := r.Get(key, ring.BlocksSync, buf) + set, err := r.Get(key, ring.BlocksSync, bufDescs, bufHosts, bufZones) // If there are no healthy instances in the replication set or // the replication set for this block doesn't include this instance diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 8c994789c09..b462ff4f907 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -609,8 +609,9 @@ func TestShuffleShardingStrategy(t *testing.T) { })) cfg := ring.Config{ - ReplicationFactor: testData.replicationFactor, - HeartbeatTimeout: time.Minute, + ReplicationFactor: testData.replicationFactor, + HeartbeatTimeout: time.Minute, + SubringCacheDisabled: true, } r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{})