diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 6a027f220..a0bd9935f 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -18,6 +18,11 @@ import ( "github.com/grafana/dskit/test" ) +const ( + // ring key used for testware + ringKey = "ring" +) + func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig { var lifecyclerConfig LifecyclerConfig flagext.DefaultValues(&lifecyclerConfig) @@ -57,7 +62,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig1.JoinAfter = 100 * time.Millisecond - lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler1.HealthyInstancesCount()) @@ -74,7 +79,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig2.JoinAfter = 100 * time.Millisecond - lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler2.HealthyInstancesCount()) @@ -119,7 +124,7 @@ func TestLifecycler_ZonesCount(t *testing.T) { cfg.JoinAfter = 100 * time.Millisecond cfg.Zone = event.zone - lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) assert.Equal(t, 0, lifecycler.ZonesCount()) @@ -145,7 +150,7 @@ func TestLifecycler_NilFlushTransferer(t *testing.T) { lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Create a lifecycler with nil FlushTransferer to make sure it operates correctly - lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) @@ -208,20 +213,20 @@ func TestLifecycler_ShouldHandleInstanceAbruptlyRestarted(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck // Add an 'ingester' with normalised tokens. lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") - l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) // Check this ingester joined, is active, and has one token. test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) return checkNormalised(d, "ing1") }) @@ -234,13 +239,13 @@ func TestLifecycler_ShouldHandleInstanceAbruptlyRestarted(t *testing.T) { time.Sleep(time.Second) // Add a second ingester with the same settings, so it will think it has restarted - l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) // Check the new ingester picked up the same tokens and registered timestamp. test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) return checkNormalised(d, "ing1") && @@ -310,7 +315,7 @@ func TestCheckReady_NoRingInKVStore(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = &MockClient{} - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, r.StartAsync(ctx)) // This is very atypical, but if we used AwaitRunning, that would fail, because of how quickly service terminates ... @@ -320,7 +325,7 @@ func TestCheckReady_NoRingInKVStore(t *testing.T) { cfg := testLifecyclerConfig(ringConfig, "ring1") cfg.MinReadyDuration = 1 * time.Nanosecond - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, l1)) t.Cleanup(func() { @@ -364,7 +369,7 @@ func TestCheckReady_MinReadyDuration(t *testing.T) { cfg.ReadinessCheckRingHealth = false cfg.MinReadyDuration = testData.minReadyDuration - l, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", IngesterRingKey, true, log.NewNopLogger(), nil) + l, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, l)) t.Cleanup(func() { @@ -434,7 +439,7 @@ func TestCheckReady_CheckRingHealth(t *testing.T) { cfg.MinReadyDuration = 0 cfg.JoinAfter = testData.firstJoinAfter - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, l1)) t.Cleanup(func() { @@ -447,7 +452,7 @@ func TestCheckReady_CheckRingHealth(t *testing.T) { cfg.MinReadyDuration = 0 cfg.JoinAfter = testData.secondJoinAfter - l2, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", IngesterRingKey, true, log.NewNopLogger(), nil) + l2, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, l2)) t.Cleanup(func() { @@ -486,7 +491,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) @@ -494,7 +499,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi poll := func(condition func(*Desc) bool) map[string]InstanceDesc { var ingesters map[string]InstanceDesc test.Poll(t, 5*time.Second, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) desc, ok := d.(*Desc) @@ -514,7 +519,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi // Disabling heartBeat and unregister_on_shutdown lifecyclerConfig.UnregisterOnShutdown = false lifecyclerConfig.HeartbeatPeriod = 0 - lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", IngesterRingKey, true, log.NewNopLogger(), nil) + lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) poll(func(desc *Desc) bool { @@ -553,7 +558,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) // Simulate ingester2 crash on startup and left the ring with JOINING state - err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { desc, ok := in.(*Desc) require.Equal(t, true, ok) ingester2Desc := desc.Ingesters["ing2"] @@ -567,7 +572,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) // Simulate ingester2 crash on startup and left the ring with PENDING state - err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { desc, ok := in.(*Desc) require.Equal(t, true, ok) ingester2Desc := desc.Ingesters["ing2"] @@ -589,7 +594,7 @@ func TestTokensOnDisk(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -601,14 +606,14 @@ func TestTokensOnDisk(t *testing.T) { lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" // Start first ingester. - l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) // Check this ingester joined, is active, and has 512 token. var expTokens []uint32 test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) desc, ok := d.(*Desc) @@ -625,7 +630,7 @@ func TestTokensOnDisk(t *testing.T) { // Start new ingester at same token directory. lifecyclerConfig.ID = "ing2" - l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck @@ -633,7 +638,7 @@ func TestTokensOnDisk(t *testing.T) { // Check this ingester joined, is active, and has 512 token. var actTokens []uint32 test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) desc, ok := d.(*Desc) if ok { @@ -662,7 +667,7 @@ func TestJoinInLeavingState(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -672,7 +677,7 @@ func TestJoinInLeavingState(t *testing.T) { cfg.MinReadyDuration = 1 * time.Nanosecond // Set state as LEAVING - err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (interface{}, bool, error) { + err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (interface{}, bool, error) { r := &Desc{ Ingesters: map[string]InstanceDesc{ "ing1": { @@ -689,13 +694,13 @@ func TestJoinInLeavingState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) // Check that the lifecycler was able to join after coming up in LEAVING test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) desc, ok := d.(*Desc) @@ -716,7 +721,7 @@ func TestJoinInJoiningState(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -728,7 +733,7 @@ func TestJoinInJoiningState(t *testing.T) { instance2RegisteredAt := time.Now().Add(-2 * time.Hour) // Set state as JOINING - err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (interface{}, bool, error) { + err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (interface{}, bool, error) { r := &Desc{ Ingesters: map[string]InstanceDesc{ "ing1": { @@ -747,13 +752,13 @@ func TestJoinInJoiningState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) // Check that the lifecycler was able to join after coming up in JOINING test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) desc, ok := d.(*Desc) @@ -780,7 +785,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { flagext.DefaultValues(&ringConfig) ringConfig.KVStore.Mock = ringStore - r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil) + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -788,7 +793,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { cfg := testLifecyclerConfig(ringConfig, "ing1") // Set ing1 to not have a zone - err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (interface{}, bool, error) { + err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (interface{}, bool, error) { r := &Desc{ Ingesters: map[string]InstanceDesc{ "ing1": { @@ -806,13 +811,13 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) // Check that the lifecycler was able to reset the zone value to the expected setting test.Poll(t, 1000*time.Millisecond, true, func() interface{} { - d, err := r.KVClient.Get(context.Background(), IngesterRingKey) + d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) desc, ok := d.(*Desc) return ok && diff --git a/ring/ring.go b/ring/ring.go index 8d6da6be6..6aaf165bf 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -29,18 +29,6 @@ import ( const ( unhealthy = "Unhealthy" - // IngesterRingKey is the key under which we store the ingesters ring in the KVStore. - IngesterRingKey = "ring" - - // RulerRingKey is the key under which we store the rulers ring in the KVStore. - RulerRingKey = "ring" - - // DistributorRingKey is the key under which we store the distributors ring in the KVStore. - DistributorRingKey = "distributor" - - // CompactorRingKey is the key under which we store the compactors ring in the KVStore. - CompactorRingKey = "compactor" - // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. GetBufferSize = 5