diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c94bdcbc..8f9bd28f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -133,3 +133,4 @@ * [BUGFIX] Ring status page: fixed the owned tokens percentage value displayed. #282 * [BUGFIX] Ring: prevent iterating the whole ring when using `ExcludedZones`. #285 * [BUGFIX] grpcclient: fix missing `.` in flag name for initial connection window size flag. #314 +* [BUGFIX] ring.Lifecycler: Handle when previous ring state is leaving and the number of tokens has changed. #79 diff --git a/ring/lifecycler.go b/ring/lifecycler.go index f4af37d6a..767ec2dc5 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "math/rand" "net" "net/http" "os" @@ -616,23 +617,42 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } - // If the ingester failed to clean its ring entry up it can leave its state in LEAVING - // OR unregister_on_shutdown=false - // Move it into ACTIVE to ensure the ingester joins the ring. - if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens { + tokens := Tokens(instanceDesc.Tokens) + level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", + len(tokens), "ring", i.RingName) + + // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its + // ring state as LEAVING. Make sure to switch to the ACTIVE state. + if instanceDesc.State == LEAVING { + delta := i.cfg.NumTokens - len(tokens) + if delta > 0 { + // We need more tokens + level.Info(i.logger).Log("msg", "existing instance has too few tokens, adding difference", + "current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens) + newTokens := GenerateTokens(delta, ringDesc.GetTokens()) + tokens = append(tokens, newTokens...) + sort.Sort(tokens) + } else if delta < 0 { + // We have too many tokens + level.Info(i.logger).Log("msg", "existing instance has too many tokens, removing difference", + "current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens) + // Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes. + rand.Shuffle(len(tokens), tokens.Swap) + tokens = tokens[0:i.cfg.NumTokens] + sort.Sort(tokens) + } + instanceDesc.State = ACTIVE + instanceDesc.Tokens = tokens } - // We're taking over this entry, update instanceDesc with our values - instanceDesc.Addr = i.Addr - instanceDesc.Zone = i.Zone - - // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. + // Set the local state based on the updated instance. i.setState(instanceDesc.State) - tokens, _ := ringDesc.TokensFor(i.ID) i.setTokens(tokens) - level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) + // We're taking over this entry, update instanceDesc with our values + instanceDesc.Addr = i.Addr + instanceDesc.Zone = i.Zone // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat // can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady. diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index f04bd46b4..0db177c38 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -457,6 +457,164 @@ func TestLifecycler_HeartbeatAfterBackendReset(t *testing.T) { assert.Equal(t, prevTokens, Tokens(desc.GetTokens())) } +// Test Lifecycler when increasing tokens and instance is already in the ring in leaving state. +func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { + ctx := context.Background() + + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + const numTokens = 128 + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, r)) + }) + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + // Make sure changes are applied instantly + lifecyclerConfig.HeartbeatPeriod = 0 + lifecyclerConfig.NumTokens = numTokens + + // Simulate ingester with 64 tokens left the ring in LEAVING state + origTokens := GenerateTokens(64, nil) + err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc := NewDesc() + addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6) + if err != nil { + return nil, false, err + } + + ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, origTokens, LEAVING, time.Now()) + return ringDesc, false, nil + }) + require.NoError(t, err) + + // Start ingester with increased number of tokens + l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, l)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, l)) + }) + + // Verify ingester joined, is active, and has 128 tokens + var ingDesc InstanceDesc + test.Poll(t, time.Second, true, func() interface{} { + d, err := r.KVClient.Get(ctx, ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + require.True(t, ok) + ingDesc = desc.Ingesters["ing1"] + t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens), + "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) + return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens + }) + + origSeen := 0 + for _, ot := range origTokens { + for _, tok := range ingDesc.Tokens { + if tok == ot { + origSeen++ + break + } + } + } + assert.Equal(t, len(origTokens), origSeen, "original tokens should be kept") + + assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool { + return ingDesc.Tokens[i] < ingDesc.Tokens[j] + }), "tokens should be sorted") +} + +// Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state. +func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { + ctx := context.Background() + + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + const numTokens = 64 + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, r)) + }) + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + // Make sure changes are applied instantly + lifecyclerConfig.HeartbeatPeriod = 0 + lifecyclerConfig.NumTokens = numTokens + + // Simulate ingester with 128 tokens left the ring in LEAVING state + origTokens := GenerateTokens(128, nil) + err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc := NewDesc() + addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6) + if err != nil { + return nil, false, err + } + + ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, origTokens, LEAVING, time.Now()) + return ringDesc, false, nil + }) + require.NoError(t, err) + + // Start ingester with decreased number of tokens + l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, l)) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, l)) + }) + + // Verify ingester joined, is active, and has 64 tokens + var ingDesc InstanceDesc + test.Poll(t, time.Second, true, func() interface{} { + d, err := r.KVClient.Get(ctx, ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + require.True(t, ok) + ingDesc = desc.Ingesters["ing1"] + t.Log(fmt.Sprintf("Polling for new ingester to have become active with %d tokens", numTokens), + "state", ingDesc.State, "tokens", len(ingDesc.Tokens)) + return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == numTokens + }) + + seen := map[uint32]struct{}{} + for _, tok := range ingDesc.Tokens { + // Guard against potential bug in token shuffling + _, exists := seen[tok] + require.False(t, exists, "tokens are not unique") + seen[tok] = struct{}{} + + found := false + for _, ot := range origTokens { + if tok == ot { + found = true + break + } + } + require.True(t, found, "old tokens were not re-used") + } + + assert.True(t, sort.SliceIsSorted(ingDesc.Tokens, func(i, j int) bool { + return ingDesc.Tokens[i] < ingDesc.Tokens[j] + }), "tokens should be sorted") +} + type MockClient struct { ListFunc func(ctx context.Context, prefix string) ([]string, error) GetFunc func(ctx context.Context, key string) (interface{}, error)