From a03a77cc0970684c56ad0f9af573fdfec005131c Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 19 Nov 2021 15:58:00 +0100 Subject: [PATCH 01/25] ring.Lifecycler: Add test for when previous ring state is leaving and diff tokens Signed-off-by: Arve Knudsen --- ring/lifecycler_test.go | 59 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index f04bd46b4..949df757d 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -3,6 +3,7 @@ package ring import ( "context" "fmt" + "path/filepath" "os" "sort" "testing" @@ -457,6 +458,64 @@ func TestLifecycler_HeartbeatAfterBackendReset(t *testing.T) { assert.Equal(t, prevTokens, Tokens(desc.GetTokens())) } +// Test Lifecycler when increasing tokens and instance is already in the ring in leaving state. +func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { + ctx := context.Background() + + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, r)) + }) + + tokenDir := t.TempDir() + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + // Make sure changes are applied instantly + lifecyclerConfig.HeartbeatPeriod = 0 + lifecyclerConfig.NumTokens = 128 + lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens") + + // Simulate ingester with 64 tokens left the ring in LEAVING state + err = r.KVClient.CAS(ctx, IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc := NewDesc() + addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil) + if err != nil { + return nil, false, err + } + + ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(64, nil), LEAVING, time.Now()) + return ringDesc, true, nil + }) + require.NoError(t, err) + + // Start ingester with increased number of tokens + l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, l)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, l)) + }) + + // Verify ingester joined, is active, and has 128 tokens + test.Poll(t, time.Second, true, func() interface{} { + d, err := r.KVClient.Get(ctx, IngesterRingKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + require.True(t, ok) + ingDesc := desc.Ingesters["ing1"] + t.Log("Polling for new ingester to have become active with 128 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) + return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 128 + }) +} + type MockClient struct { ListFunc func(ctx context.Context, prefix string) ([]string, error) GetFunc func(ctx context.Context, key string) (interface{}, error) From aae6c09c1111c5c928f34ebe039cc15174c767f8 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 26 Nov 2021 16:37:09 +0100 Subject: [PATCH 02/25] ring.Lifecycler: Handle when previous ring state is leaving and diff tokens Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 46 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index f4af37d6a..82b7b84a4 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -616,23 +616,45 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } + level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(instanceDesc.Tokens), "ring", i.RingName) + // If the ingester failed to clean its ring entry up it can leave its state in LEAVING // OR unregister_on_shutdown=false // Move it into ACTIVE to ensure the ingester joins the ring. - if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens { - instanceDesc.State = ACTIVE - } - - // We're taking over this entry, update instanceDesc with our values - instanceDesc.Addr = i.Addr - instanceDesc.Zone = i.Zone + if instanceDesc.State == LEAVING { + var tokens Tokens + isActive := true + if len(instanceDesc.Tokens) != i.cfg.NumTokens { + level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", "existingTokens", len(instanceDesc.Tokens), "newTokens", i.cfg.NumTokens) + if len(tokensFromFile) > 0 { + level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile)) + isActive = len(tokensFromFile) >= i.cfg.NumTokens + tokens = tokensFromFile + } else { + level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones") + takenTokens := ringDesc.GetTokens() + needTokens := i.cfg.NumTokens - len(instanceDesc.Tokens) + newTokens := GenerateTokens(needTokens, takenTokens) + tokens = append(instanceDesc.Tokens, newTokens...) + sort.Sort(tokens) + } + } - // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. - i.setState(instanceDesc.State) - tokens, _ := ringDesc.TokensFor(i.ID) - i.setTokens(tokens) + if isActive { + level.Debug(i.logger).Log("msg", "switching state to active") + i.setState(ACTIVE) + } else { + level.Debug(i.logger).Log("msg", "not switching state to active", "state", i.state) + } - level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) + i.setTokens(tokens) + instanceDesc.State = i.state + instanceDesc.Tokens = tokens + } else { + // We exist in the ring and not in leaving state, so assume the ring is right and copy out tokens & state out of there. + i.setState(instanceDesc.State) + i.setTokens(instanceDesc.Tokens) + } // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat // can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady. From f433e00b71414235d76d09076af0a2a517667ff9 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 26 Nov 2021 16:40:20 +0100 Subject: [PATCH 03/25] Add changelog entry Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c94bdcbc..8f9bd28f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -133,3 +133,4 @@ * [BUGFIX] Ring status page: fixed the owned tokens percentage value displayed. #282 * [BUGFIX] Ring: prevent iterating the whole ring when using `ExcludedZones`. #285 * [BUGFIX] grpcclient: fix missing `.` in flag name for initial connection window size flag. #314 +* [BUGFIX] ring.Lifecycler: Handle when previous ring state is leaving and the number of tokens has changed. #79 From 6bb6223fc7449bf90572424d712f5101aea930f4 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 17 Dec 2021 16:22:16 +0100 Subject: [PATCH 04/25] Fix typos Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 82b7b84a4..4bcfd1424 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -616,7 +616,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(instanceDesc.Tokens), "ring", i.RingName) + level.Info(i.logger).Log("msg", "existing entry found in ring", "state", instanceDesc.State, "tokens", len(instanceDesc.Tokens), "ring", i.RingName) // If the ingester failed to clean its ring entry up it can leave its state in LEAVING // OR unregister_on_shutdown=false @@ -651,7 +651,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { instanceDesc.State = i.state instanceDesc.Tokens = tokens } else { - // We exist in the ring and not in leaving state, so assume the ring is right and copy out tokens & state out of there. + // We exist in the ring and not in leaving state, so assume the ring is right and copy tokens & state out of there. i.setState(instanceDesc.State) i.setTokens(instanceDesc.Tokens) } From a4db12097ade6eb0ca40eea9a5fe3bc8b8666903 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 17 Dec 2021 16:29:22 +0100 Subject: [PATCH 05/25] Use GetState() instead of accessing state directly Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 4bcfd1424..3e8d00c96 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -644,11 +644,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Debug(i.logger).Log("msg", "switching state to active") i.setState(ACTIVE) } else { - level.Debug(i.logger).Log("msg", "not switching state to active", "state", i.state) + level.Debug(i.logger).Log("msg", "not switching state to active", "state", i.GetState()) } i.setTokens(tokens) - instanceDesc.State = i.state + instanceDesc.State = i.GetState() instanceDesc.Tokens = tokens } else { // We exist in the ring and not in leaving state, so assume the ring is right and copy tokens & state out of there. From f018f6dfedf0c7d030d54bb7e13c4ea49484f2bf Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 17 Dec 2021 16:55:46 +0100 Subject: [PATCH 06/25] ring: Handle decreasing number of tokens Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 13 ++++++--- ring/lifecycler_test.go | 58 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 3e8d00c96..81b1bd118 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -630,13 +630,18 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile)) isActive = len(tokensFromFile) >= i.cfg.NumTokens tokens = tokensFromFile - } else { - level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones") - takenTokens := ringDesc.GetTokens() + } else if i.cfg.NumTokens == len(instanceDesc.Tokens) { + level.Debug(i.logger).Log("msg", "no tokens in file, adopting those of existing instance") + tokens = instanceDesc.Tokens + } else if i.cfg.NumTokens > len(instanceDesc.Tokens) { needTokens := i.cfg.NumTokens - len(instanceDesc.Tokens) - newTokens := GenerateTokens(needTokens, takenTokens) + level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones in addition to those of existing instance", "newTokens", needTokens) + newTokens := GenerateTokens(needTokens, ringDesc.GetTokens()) tokens = append(instanceDesc.Tokens, newTokens...) sort.Sort(tokens) + } else { + level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", "numTokens", i.cfg.NumTokens) + tokens = instanceDesc.Tokens[0:i.cfg.NumTokens] } } diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 949df757d..50ecc52b9 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -516,6 +516,64 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { }) } +// Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state. +func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { + ctx := context.Background() + + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, r)) + }) + + tokenDir := t.TempDir() + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + // Make sure changes are applied instantly + lifecyclerConfig.HeartbeatPeriod = 0 + lifecyclerConfig.NumTokens = 64 + lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens") + + // Simulate ingester with 128 tokens left the ring in LEAVING state + err = r.KVClient.CAS(ctx, IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc := NewDesc() + addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil) + if err != nil { + return nil, false, err + } + + ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(128, nil), LEAVING, time.Now()) + return ringDesc, true, nil + }) + require.NoError(t, err) + + // Start ingester with decreased number of tokens + l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, l)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, l)) + }) + + // Verify ingester joined, is active, and has 64 tokens + test.Poll(t, time.Second, true, func() interface{} { + d, err := r.KVClient.Get(ctx, IngesterRingKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + require.True(t, ok) + ingDesc := desc.Ingesters["ing1"] + t.Log("Polling for new ingester to have become active with 64 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) + return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 64 + }) +} + type MockClient struct { ListFunc func(ctx context.Context, prefix string) ([]string, error) GetFunc func(ctx context.Context, key string) (interface{}, error) From edfb8b636fea1888a59ffdf5905c384c8d06e6d9 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 5 Jan 2022 16:26:10 +0100 Subject: [PATCH 07/25] ring: Fix tests Signed-off-by: Arve Knudsen --- ring/lifecycler_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 50ecc52b9..966909f04 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -468,7 +468,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { var ringConfig Config flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, r)) t.Cleanup(func() { @@ -483,7 +483,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens") // Simulate ingester with 64 tokens left the ring in LEAVING state - err = r.KVClient.CAS(ctx, IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { ringDesc := NewDesc() addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil) if err != nil { @@ -496,7 +496,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { require.NoError(t, err) // Start ingester with increased number of tokens - l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, l)) t.Cleanup(func() { @@ -505,7 +505,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { // Verify ingester joined, is active, and has 128 tokens test.Poll(t, time.Second, true, func() interface{} { - d, err := r.KVClient.Get(ctx, IngesterRingKey) + d, err := r.KVClient.Get(ctx, ringKey) require.NoError(t, err) desc, ok := d.(*Desc) @@ -526,7 +526,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { var ringConfig Config flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, r)) t.Cleanup(func() { @@ -541,7 +541,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens") // Simulate ingester with 128 tokens left the ring in LEAVING state - err = r.KVClient.CAS(ctx, IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { ringDesc := NewDesc() addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil) if err != nil { @@ -554,7 +554,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { require.NoError(t, err) // Start ingester with decreased number of tokens - l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, l)) t.Cleanup(func() { @@ -563,7 +563,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { // Verify ingester joined, is active, and has 64 tokens test.Poll(t, time.Second, true, func() interface{} { - d, err := r.KVClient.Get(ctx, IngesterRingKey) + d, err := r.KVClient.Get(ctx, ringKey) require.NoError(t, err) desc, ok := d.(*Desc) From d7747c7cce355e3c03e5ebb283b01bd2f6800f35 Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Fri, 24 Jun 2022 16:43:40 -0300 Subject: [PATCH 08/25] Assign default value for tokens. * Without this, if an ingester leave the ring with the same number of tokens as the new one joining, the new ingester will be assigned nil tokens. --- ring/lifecycler.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 81b1bd118..c6ec36da2 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -616,23 +616,18 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - level.Info(i.logger).Log("msg", "existing entry found in ring", "state", instanceDesc.State, "tokens", len(instanceDesc.Tokens), "ring", i.RingName) - // If the ingester failed to clean its ring entry up it can leave its state in LEAVING // OR unregister_on_shutdown=false // Move it into ACTIVE to ensure the ingester joins the ring. if instanceDesc.State == LEAVING { - var tokens Tokens - isActive := true + var tokens Tokens = instanceDesc.Tokens // way of forcing tokens to be of type Tokens instead of []uint32. + setIsActive := true if len(instanceDesc.Tokens) != i.cfg.NumTokens { level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", "existingTokens", len(instanceDesc.Tokens), "newTokens", i.cfg.NumTokens) if len(tokensFromFile) > 0 { level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile)) - isActive = len(tokensFromFile) >= i.cfg.NumTokens + setIsActive = len(tokensFromFile) >= i.cfg.NumTokens tokens = tokensFromFile - } else if i.cfg.NumTokens == len(instanceDesc.Tokens) { - level.Debug(i.logger).Log("msg", "no tokens in file, adopting those of existing instance") - tokens = instanceDesc.Tokens } else if i.cfg.NumTokens > len(instanceDesc.Tokens) { needTokens := i.cfg.NumTokens - len(instanceDesc.Tokens) level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones in addition to those of existing instance", "newTokens", needTokens) @@ -643,9 +638,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", "numTokens", i.cfg.NumTokens) tokens = instanceDesc.Tokens[0:i.cfg.NumTokens] } + } else { + level.Debug(i.logger).Log("msg", "adopting tokens of existing instance") } - if isActive { + if setIsActive { level.Debug(i.logger).Log("msg", "switching state to active") i.setState(ACTIVE) } else { @@ -661,6 +658,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error { i.setTokens(instanceDesc.Tokens) } + // We're taking over this entry, update instanceDesc with our values + instanceDesc.Addr = i.Addr + instanceDesc.Zone = i.Zone + + // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. + i.setState(instanceDesc.State) + tokens, _ := ringDesc.TokensFor(i.ID) + i.setTokens(tokens) + + level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) + // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat // can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady. if !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { From 18c78ae50d943ee1492a4540c419a4a153627b34 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 2 Jun 2023 11:59:21 +0200 Subject: [PATCH 09/25] Fix tests Signed-off-by: Arve Knudsen --- ring/lifecycler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 966909f04..10c4b5c6a 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -3,8 +3,8 @@ package ring import ( "context" "fmt" - "path/filepath" "os" + "path/filepath" "sort" "testing" "time" @@ -485,7 +485,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { // Simulate ingester with 64 tokens left the ring in LEAVING state err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { ringDesc := NewDesc() - addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil) + addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6) if err != nil { return nil, false, err } @@ -543,7 +543,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { // Simulate ingester with 128 tokens left the ring in LEAVING state err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { ringDesc := NewDesc() - addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil) + addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6) if err != nil { return nil, false, err } From a82904ebc5ef8626c9b1c114419c2481e5f89b69 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 2 Jun 2023 16:48:46 +0200 Subject: [PATCH 10/25] Simplify code Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index c6ec36da2..e283c8fdd 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -620,23 +620,23 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // OR unregister_on_shutdown=false // Move it into ACTIVE to ensure the ingester joins the ring. if instanceDesc.State == LEAVING { - var tokens Tokens = instanceDesc.Tokens // way of forcing tokens to be of type Tokens instead of []uint32. + tokens := Tokens(instanceDesc.Tokens) setIsActive := true - if len(instanceDesc.Tokens) != i.cfg.NumTokens { - level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", "existingTokens", len(instanceDesc.Tokens), "newTokens", i.cfg.NumTokens) + if len(tokens) != i.cfg.NumTokens { + level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", "existingTokens", len(tokens), "newTokens", i.cfg.NumTokens) if len(tokensFromFile) > 0 { level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile)) setIsActive = len(tokensFromFile) >= i.cfg.NumTokens tokens = tokensFromFile - } else if i.cfg.NumTokens > len(instanceDesc.Tokens) { - needTokens := i.cfg.NumTokens - len(instanceDesc.Tokens) + } else if i.cfg.NumTokens > len(tokens) { + needTokens := i.cfg.NumTokens - len(tokens) level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones in addition to those of existing instance", "newTokens", needTokens) newTokens := GenerateTokens(needTokens, ringDesc.GetTokens()) - tokens = append(instanceDesc.Tokens, newTokens...) + tokens = append(tokens, newTokens...) sort.Sort(tokens) } else { level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", "numTokens", i.cfg.NumTokens) - tokens = instanceDesc.Tokens[0:i.cfg.NumTokens] + tokens = tokens[0:i.cfg.NumTokens] } } else { level.Debug(i.logger).Log("msg", "adopting tokens of existing instance") From e55b1a77393b483bd92dc48ed7c730491b253a0c Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 2 Jun 2023 19:16:07 +0200 Subject: [PATCH 11/25] initRing: Always switch from LEAVING to ACTIVE state Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index e283c8fdd..4eb69d9e4 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -616,38 +616,43 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - // If the ingester failed to clean its ring entry up it can leave its state in LEAVING - // OR unregister_on_shutdown=false - // Move it into ACTIVE to ensure the ingester joins the ring. + tokens := Tokens(instanceDesc.Tokens) + + // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its + // ring state as LEAVING. Make sure to switch to the ACTIVE state. if instanceDesc.State == LEAVING { - tokens := Tokens(instanceDesc.Tokens) - setIsActive := true if len(tokens) != i.cfg.NumTokens { - level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", "existingTokens", len(tokens), "newTokens", i.cfg.NumTokens) + level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", + "existing_tokens", len(tokens), "new_tokens", i.cfg.NumTokens) if len(tokensFromFile) > 0 { - level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile)) - setIsActive = len(tokensFromFile) >= i.cfg.NumTokens + tokens = tokensFromFile + if len(tokens) < i.cfg.NumTokens { + needTokens := i.cfg.NumTokens - len(tokens) + newTokens := GenerateTokens(needTokens, ringDesc.GetTokens()) + tokens = append(tokens, newTokens...) + sort.Sort(tokens) + } + level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokens)) tokens = tokensFromFile } else if i.cfg.NumTokens > len(tokens) { needTokens := i.cfg.NumTokens - len(tokens) - level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones in addition to those of existing instance", "newTokens", needTokens) + level.Debug(i.logger).Log("msg", + "no tokens in file, generating new ones in addition to those of existing instance", + "new_tokens", needTokens) newTokens := GenerateTokens(needTokens, ringDesc.GetTokens()) tokens = append(tokens, newTokens...) sort.Sort(tokens) } else { - level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", "numTokens", i.cfg.NumTokens) + level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", + "num_tokens", i.cfg.NumTokens) tokens = tokens[0:i.cfg.NumTokens] } } else { level.Debug(i.logger).Log("msg", "adopting tokens of existing instance") } - if setIsActive { - level.Debug(i.logger).Log("msg", "switching state to active") - i.setState(ACTIVE) - } else { - level.Debug(i.logger).Log("msg", "not switching state to active", "state", i.GetState()) - } + level.Debug(i.logger).Log("msg", "switching state to active") + i.setState(ACTIVE) i.setTokens(tokens) instanceDesc.State = i.GetState() @@ -655,18 +660,13 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } else { // We exist in the ring and not in leaving state, so assume the ring is right and copy tokens & state out of there. i.setState(instanceDesc.State) - i.setTokens(instanceDesc.Tokens) + i.setTokens(tokens) } // We're taking over this entry, update instanceDesc with our values instanceDesc.Addr = i.Addr instanceDesc.Zone = i.Zone - // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. - i.setState(instanceDesc.State) - tokens, _ := ringDesc.TokensFor(i.ID) - i.setTokens(tokens) - level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat @@ -769,6 +769,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er // At this point, we should not have any tokens, and we should be in PENDING state. myTokens, takenTokens := ringDesc.TokensFor(i.ID) if len(myTokens) > 0 { + // TODO: Should we log this case as an error, as we don't treat it as one? level.Error(i.logger).Log("msg", "tokens already exist for this instance - wasn't expecting any!", "num_tokens", len(myTokens), "ring", i.RingName) } From e6b6d0925f2d18a206abadd22f41ba31318413f3 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sat, 3 Jun 2023 15:36:19 +0200 Subject: [PATCH 12/25] Simplify code for calculating tokens Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 47 +++++++++++++++-------------------------- ring/lifecycler_test.go | 5 ----- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 4eb69d9e4..67aa41a83 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -617,41 +617,30 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } tokens := Tokens(instanceDesc.Tokens) + level.Info(i.logger).Log("msg", "existing instance found in ring", "state", i.GetState(), "tokens", + len(tokens), "ring", i.RingName) // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its // ring state as LEAVING. Make sure to switch to the ACTIVE state. if instanceDesc.State == LEAVING { - if len(tokens) != i.cfg.NumTokens { - level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", - "existing_tokens", len(tokens), "new_tokens", i.cfg.NumTokens) - if len(tokensFromFile) > 0 { - tokens = tokensFromFile - if len(tokens) < i.cfg.NumTokens { - needTokens := i.cfg.NumTokens - len(tokens) - newTokens := GenerateTokens(needTokens, ringDesc.GetTokens()) - tokens = append(tokens, newTokens...) - sort.Sort(tokens) - } - level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokens)) - tokens = tokensFromFile - } else if i.cfg.NumTokens > len(tokens) { - needTokens := i.cfg.NumTokens - len(tokens) - level.Debug(i.logger).Log("msg", - "no tokens in file, generating new ones in addition to those of existing instance", - "new_tokens", needTokens) - newTokens := GenerateTokens(needTokens, ringDesc.GetTokens()) - tokens = append(tokens, newTokens...) - sort.Sort(tokens) - } else { - level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", - "num_tokens", i.cfg.NumTokens) - tokens = tokens[0:i.cfg.NumTokens] - } + delta := i.cfg.NumTokens - len(tokens) + if delta > 0 { + // We need more tokens + level.Debug(i.logger).Log("msg", "existing instance has too few tokens, adding difference", + "num_tokens", len(tokens), "need", i.cfg.NumTokens) + newTokens := GenerateTokens(delta, ringDesc.GetTokens()) + tokens = append(tokens, newTokens...) + sort.Sort(tokens) + } else if delta < 0 { + // We have too many tokens + level.Debug(i.logger).Log("msg", "existing instance has too many tokens, removing difference", + "num_tokens", len(tokens), "need", i.cfg.NumTokens) + tokens = tokens[0:i.cfg.NumTokens] } else { - level.Debug(i.logger).Log("msg", "adopting tokens of existing instance") + level.Debug(i.logger).Log("msg", "existing instance has the right amount of tokens") } - level.Debug(i.logger).Log("msg", "switching state to active") + level.Debug(i.logger).Log("msg", "switching state from LEAVING to ACTIVE") i.setState(ACTIVE) i.setTokens(tokens) @@ -667,8 +656,6 @@ func (i *Lifecycler) initRing(ctx context.Context) error { instanceDesc.Addr = i.Addr instanceDesc.Zone = i.Zone - level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) - // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat // can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady. if !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 10c4b5c6a..65597f2ff 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "path/filepath" "sort" "testing" "time" @@ -475,12 +474,10 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { assert.NoError(t, services.StopAndAwaitTerminated(ctx, r)) }) - tokenDir := t.TempDir() lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Make sure changes are applied instantly lifecyclerConfig.HeartbeatPeriod = 0 lifecyclerConfig.NumTokens = 128 - lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens") // Simulate ingester with 64 tokens left the ring in LEAVING state err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { @@ -533,12 +530,10 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { assert.NoError(t, services.StopAndAwaitTerminated(ctx, r)) }) - tokenDir := t.TempDir() lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Make sure changes are applied instantly lifecyclerConfig.HeartbeatPeriod = 0 lifecyclerConfig.NumTokens = 64 - lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens") // Simulate ingester with 128 tokens left the ring in LEAVING state err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { From be2f2404821049e107a6f709271303556b33f1f0 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sat, 3 Jun 2023 16:13:18 +0200 Subject: [PATCH 13/25] Improve tests Signed-off-by: Arve Knudsen --- ring/lifecycler_test.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 65597f2ff..58b80ec2b 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -464,6 +464,8 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + const numTokens = 128 + var ringConfig Config flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore @@ -477,7 +479,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Make sure changes are applied instantly lifecyclerConfig.HeartbeatPeriod = 0 - lifecyclerConfig.NumTokens = 128 + lifecyclerConfig.NumTokens = numTokens // Simulate ingester with 64 tokens left the ring in LEAVING state err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { @@ -488,7 +490,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { } ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(64, nil), LEAVING, time.Now()) - return ringDesc, true, nil + return ringDesc, false, nil }) require.NoError(t, err) @@ -508,8 +510,9 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { desc, ok := d.(*Desc) require.True(t, ok) ingDesc := desc.Ingesters["ing1"] - t.Log("Polling for new ingester to have become active with 128 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) - return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 128 + t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens), + "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) + return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens }) } @@ -520,6 +523,8 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + const numTokens = 64 + var ringConfig Config flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore @@ -533,7 +538,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Make sure changes are applied instantly lifecyclerConfig.HeartbeatPeriod = 0 - lifecyclerConfig.NumTokens = 64 + lifecyclerConfig.NumTokens = numTokens // Simulate ingester with 128 tokens left the ring in LEAVING state err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { @@ -544,7 +549,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { } ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(128, nil), LEAVING, time.Now()) - return ringDesc, true, nil + return ringDesc, false, nil }) require.NoError(t, err) @@ -564,8 +569,9 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { desc, ok := d.(*Desc) require.True(t, ok) ingDesc := desc.Ingesters["ing1"] - t.Log("Polling for new ingester to have become active with 64 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) - return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 64 + t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens), + "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) + return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens }) } From daf07f8927c6cb7771ef50188b3e55a79ad2b7c7 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sat, 3 Jun 2023 16:45:14 +0200 Subject: [PATCH 14/25] When dropping tokens, randomize Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 67aa41a83..7cecbbe05 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "math/rand" "net" "net/http" "os" @@ -635,7 +636,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // We have too many tokens level.Debug(i.logger).Log("msg", "existing instance has too many tokens, removing difference", "num_tokens", len(tokens), "need", i.cfg.NumTokens) + // Originally suggested by Andrea Gardiman, make sure we don't pick the N smallest tokens, + // since that would increase the chance of the instance receiving only smaller hashes + // https://github.com/grafana/dskit/pull/79#discussion_r1056205242 + rand.Shuffle(len(tokens), func(i, j int) { + tokens[i] = tokens[j] + }) tokens = tokens[0:i.cfg.NumTokens] + sort.Sort(tokens) } else { level.Debug(i.logger).Log("msg", "existing instance has the right amount of tokens") } From 3ed54ed641da7a696e913250aa78694d10504b10 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 5 Jun 2023 09:40:52 +0200 Subject: [PATCH 15/25] Fix shuffle Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 7cecbbe05..79195f87e 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -640,7 +640,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // since that would increase the chance of the instance receiving only smaller hashes // https://github.com/grafana/dskit/pull/79#discussion_r1056205242 rand.Shuffle(len(tokens), func(i, j int) { - tokens[i] = tokens[j] + tokens[i], tokens[j] = tokens[j], tokens[i] }) tokens = tokens[0:i.cfg.NumTokens] sort.Sort(tokens) From 9e7c8b09c5b8ce655d9f94262de6baa9d7c6a1a3 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 5 Jun 2023 10:13:44 +0200 Subject: [PATCH 16/25] Fix logging of state Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 79195f87e..79f49c8ec 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -618,7 +618,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } tokens := Tokens(instanceDesc.Tokens) - level.Info(i.logger).Log("msg", "existing instance found in ring", "state", i.GetState(), "tokens", + level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", len(tokens), "ring", i.RingName) // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its From 2ecfb2ca714a29813f905b9c0926db3e5de0b173 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 5 Jun 2023 15:50:40 +0200 Subject: [PATCH 17/25] Augment tests Signed-off-by: Arve Knudsen --- ring/lifecycler_test.go | 49 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 58b80ec2b..cc79f44f9 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -482,6 +482,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { lifecyclerConfig.NumTokens = numTokens // Simulate ingester with 64 tokens left the ring in LEAVING state + origTokens := GenerateTokens(64, nil) err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { ringDesc := NewDesc() addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6) @@ -489,7 +490,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { return nil, false, err } - ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(64, nil), LEAVING, time.Now()) + ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, origTokens, LEAVING, time.Now()) return ringDesc, false, nil }) require.NoError(t, err) @@ -503,17 +504,33 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { }) // Verify ingester joined, is active, and has 128 tokens + var ingDesc InstanceDesc test.Poll(t, time.Second, true, func() interface{} { d, err := r.KVClient.Get(ctx, ringKey) require.NoError(t, err) desc, ok := d.(*Desc) require.True(t, ok) - ingDesc := desc.Ingesters["ing1"] + ingDesc = desc.Ingesters["ing1"] t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens), "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens }) + + origSeen := 0 + for _, ot := range origTokens { + for _, tok := range ingDesc.Tokens { + if tok == ot { + origSeen++ + break + } + } + } + assert.Equal(t, len(origTokens), origSeen, "original tokens should be kept") + + assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool { + return ingDesc.Tokens[i] < ingDesc.Tokens[j] + }), "tokens should be sorted") } // Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state. @@ -541,6 +558,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { lifecyclerConfig.NumTokens = numTokens // Simulate ingester with 128 tokens left the ring in LEAVING state + origTokens := GenerateTokens(128, nil) err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { ringDesc := NewDesc() addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6) @@ -548,7 +566,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { return nil, false, err } - ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(128, nil), LEAVING, time.Now()) + ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, origTokens, LEAVING, time.Now()) return ringDesc, false, nil }) require.NoError(t, err) @@ -562,17 +580,40 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { }) // Verify ingester joined, is active, and has 64 tokens + var ingDesc InstanceDesc test.Poll(t, time.Second, true, func() interface{} { d, err := r.KVClient.Get(ctx, ringKey) require.NoError(t, err) desc, ok := d.(*Desc) require.True(t, ok) - ingDesc := desc.Ingesters["ing1"] + ingDesc = desc.Ingesters["ing1"] t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens), "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens }) + + seen := map[uint32]struct{}{} + for _, tok := range ingDesc.Tokens { + // Guard against potential bug in token shuffling + _, exists := seen[tok] + require.False(t, exists, "tokens are not unique") + + found := false + for _, ot := range origTokens { + if tok == ot { + found = true + break + } + } + require.True(t, found, "old tokens were not re-used") + + seen[tok] = struct{}{} + } + + assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool { + return ingDesc.Tokens[i] < ingDesc.Tokens[j] + }), "tokens should be sorted") } type MockClient struct { From cc0d394b7f8c5026394da3250bf1d1af97199370 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 6 Jun 2023 19:13:20 +0200 Subject: [PATCH 18/25] Update ring/lifecycler.go Co-authored-by: Marco Pracucci --- ring/lifecycler.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 79f49c8ec..fdac3e720 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -636,9 +636,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // We have too many tokens level.Debug(i.logger).Log("msg", "existing instance has too many tokens, removing difference", "num_tokens", len(tokens), "need", i.cfg.NumTokens) - // Originally suggested by Andrea Gardiman, make sure we don't pick the N smallest tokens, - // since that would increase the chance of the instance receiving only smaller hashes - // https://github.com/grafana/dskit/pull/79#discussion_r1056205242 + // Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes. rand.Shuffle(len(tokens), func(i, j int) { tokens[i], tokens[j] = tokens[j], tokens[i] }) From 8b14cda61cf34d7c35e5436ab1945ccf1ea24d54 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 6 Jun 2023 19:14:50 +0200 Subject: [PATCH 19/25] Update ring/lifecycler.go Co-authored-by: Marco Pracucci --- ring/lifecycler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index fdac3e720..9060cedf9 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -634,8 +634,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { sort.Sort(tokens) } else if delta < 0 { // We have too many tokens - level.Debug(i.logger).Log("msg", "existing instance has too many tokens, removing difference", - "num_tokens", len(tokens), "need", i.cfg.NumTokens) + level.Info(i.logger).Log("msg", "existing instance has too many tokens, removing difference", + "current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens) // Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes. rand.Shuffle(len(tokens), func(i, j int) { tokens[i], tokens[j] = tokens[j], tokens[i] From 703c0b634d3e56c34452c42d2d73c18646fc6088 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 6 Jun 2023 19:15:23 +0200 Subject: [PATCH 20/25] Update ring/lifecycler.go Co-authored-by: Marco Pracucci --- ring/lifecycler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 9060cedf9..a7e505a23 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -642,8 +642,6 @@ func (i *Lifecycler) initRing(ctx context.Context) error { }) tokens = tokens[0:i.cfg.NumTokens] sort.Sort(tokens) - } else { - level.Debug(i.logger).Log("msg", "existing instance has the right amount of tokens") } level.Debug(i.logger).Log("msg", "switching state from LEAVING to ACTIVE") From 90c766dbfc7e6aafc91f64364547a89dff3a2739 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 6 Jun 2023 19:15:35 +0200 Subject: [PATCH 21/25] Update ring/lifecycler.go Co-authored-by: Marco Pracucci --- ring/lifecycler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index a7e505a23..4c9ea4392 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -627,8 +627,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { delta := i.cfg.NumTokens - len(tokens) if delta > 0 { // We need more tokens - level.Debug(i.logger).Log("msg", "existing instance has too few tokens, adding difference", - "num_tokens", len(tokens), "need", i.cfg.NumTokens) + level.Info(i.logger).Log("msg", "existing instance has too few tokens, adding difference", + "current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens) newTokens := GenerateTokens(delta, ringDesc.GetTokens()) tokens = append(tokens, newTokens...) sort.Sort(tokens) From 6d1bbeab30082ad8c38f71f03de6e375b358d6e1 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 7 Jun 2023 09:07:18 +0200 Subject: [PATCH 22/25] Update ring/lifecycler.go Co-authored-by: Marco Pracucci --- ring/lifecycler.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 4c9ea4392..c2b1d7671 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -637,9 +637,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Info(i.logger).Log("msg", "existing instance has too many tokens, removing difference", "current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens) // Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes. - rand.Shuffle(len(tokens), func(i, j int) { - tokens[i], tokens[j] = tokens[j], tokens[i] - }) + rand.Shuffle(len(tokens), tokens.Swap) tokens = tokens[0:i.cfg.NumTokens] sort.Sort(tokens) } From c9f047ce8ec317faa5059bc3fc5078fcfd1ce1d6 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 7 Jun 2023 09:09:55 +0200 Subject: [PATCH 23/25] Update ring/lifecycler.go Co-authored-by: Marco Pracucci --- ring/lifecycler.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index c2b1d7671..55a1b441f 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -642,18 +642,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error { sort.Sort(tokens) } - level.Debug(i.logger).Log("msg", "switching state from LEAVING to ACTIVE") - i.setState(ACTIVE) - - i.setTokens(tokens) - instanceDesc.State = i.GetState() + instanceDesc.State = ACTIVE instanceDesc.Tokens = tokens - } else { - // We exist in the ring and not in leaving state, so assume the ring is right and copy tokens & state out of there. - i.setState(instanceDesc.State) - i.setTokens(tokens) } + // Set the local state based on the updated instance. + i.setState(instanceDesc.State) + i.setTokens(tokens) + // We're taking over this entry, update instanceDesc with our values instanceDesc.Addr = i.Addr instanceDesc.Zone = i.Zone From 301e50b92ac62f809550033bc0f95665b552a696 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 7 Jun 2023 09:07:38 +0200 Subject: [PATCH 24/25] Remove TODO Signed-off-by: Arve Knudsen --- ring/lifecycler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 55a1b441f..767ec2dc5 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -754,7 +754,6 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er // At this point, we should not have any tokens, and we should be in PENDING state. myTokens, takenTokens := ringDesc.TokensFor(i.ID) if len(myTokens) > 0 { - // TODO: Should we log this case as an error, as we don't treat it as one? level.Error(i.logger).Log("msg", "tokens already exist for this instance - wasn't expecting any!", "num_tokens", len(myTokens), "ring", i.RingName) } From bff82f62c647a507c5662b43aa8f17e71ef4e817 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 7 Jun 2023 10:06:15 +0200 Subject: [PATCH 25/25] Improve test readability a bit Signed-off-by: Arve Knudsen --- ring/lifecycler_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index cc79f44f9..0db177c38 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -598,6 +598,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { // Guard against potential bug in token shuffling _, exists := seen[tok] require.False(t, exists, "tokens are not unique") + seen[tok] = struct{}{} found := false for _, ot := range origTokens { @@ -607,8 +608,6 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { } } require.True(t, found, "old tokens were not re-used") - - seen[tok] = struct{}{} } assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool {