Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 41 additions & 36 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/grafana/dskit/test"
)

const (
// ring key used for testware
ringKey = "ring"
)

func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
var lifecyclerConfig LifecyclerConfig
flagext.DefaultValues(&lifecyclerConfig)
Expand Down Expand Up @@ -57,7 +62,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) {
lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig1.JoinAfter = 100 * time.Millisecond

lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler1.HealthyInstancesCount())

Expand All @@ -74,7 +79,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) {
lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig2.JoinAfter = 100 * time.Millisecond

lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler2.HealthyInstancesCount())

Expand Down Expand Up @@ -119,7 +124,7 @@ func TestLifecycler_ZonesCount(t *testing.T) {
cfg.JoinAfter = 100 * time.Millisecond
cfg.Zone = event.zone

lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler.ZonesCount())

Expand All @@ -145,7 +150,7 @@ func TestLifecycler_NilFlushTransferer(t *testing.T) {
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")

// Create a lifecycler with nil FlushTransferer to make sure it operates correctly
lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler))

Expand Down Expand Up @@ -208,20 +213,20 @@ func TestLifecycler_ShouldHandleInstanceAbruptlyRestarted(t *testing.T) {
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

// Add an 'ingester' with normalised tokens.
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check this ingester joined, is active, and has one token.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
return checkNormalised(d, "ing1")
})
Expand All @@ -234,13 +239,13 @@ func TestLifecycler_ShouldHandleInstanceAbruptlyRestarted(t *testing.T) {
time.Sleep(time.Second)

// Add a second ingester with the same settings, so it will think it has restarted
l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))

// Check the new ingester picked up the same tokens and registered timestamp.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

return checkNormalised(d, "ing1") &&
Expand Down Expand Up @@ -310,7 +315,7 @@ func TestCheckReady_NoRingInKVStore(t *testing.T) {
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = &MockClient{}

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, r.StartAsync(ctx))
// This is very atypical, but if we used AwaitRunning, that would fail, because of how quickly service terminates ...
Expand All @@ -320,7 +325,7 @@ func TestCheckReady_NoRingInKVStore(t *testing.T) {

cfg := testLifecyclerConfig(ringConfig, "ring1")
cfg.MinReadyDuration = 1 * time.Nanosecond
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l1))
t.Cleanup(func() {
Expand Down Expand Up @@ -364,7 +369,7 @@ func TestCheckReady_MinReadyDuration(t *testing.T) {
cfg.ReadinessCheckRingHealth = false
cfg.MinReadyDuration = testData.minReadyDuration

l, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", IngesterRingKey, true, log.NewNopLogger(), nil)
l, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
Expand Down Expand Up @@ -434,7 +439,7 @@ func TestCheckReady_CheckRingHealth(t *testing.T) {
cfg.MinReadyDuration = 0
cfg.JoinAfter = testData.firstJoinAfter

l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l1))
t.Cleanup(func() {
Expand All @@ -447,7 +452,7 @@ func TestCheckReady_CheckRingHealth(t *testing.T) {
cfg.MinReadyDuration = 0
cfg.JoinAfter = testData.secondJoinAfter

l2, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", IngesterRingKey, true, log.NewNopLogger(), nil)
l2, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ring", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l2))
t.Cleanup(func() {
Expand Down Expand Up @@ -486,15 +491,15 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))

// poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
poll := func(condition func(*Desc) bool) map[string]InstanceDesc {
var ingesters map[string]InstanceDesc
test.Poll(t, 5*time.Second, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
Expand All @@ -514,7 +519,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
// Disabling heartBeat and unregister_on_shutdown
lifecyclerConfig.UnregisterOnShutdown = false
lifecyclerConfig.HeartbeatPeriod = 0
lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", IngesterRingKey, true, log.NewNopLogger(), nil)
lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler))
poll(func(desc *Desc) bool {
Expand Down Expand Up @@ -553,7 +558,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

// Simulate ingester2 crash on startup and left the ring with JOINING state
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
desc, ok := in.(*Desc)
require.Equal(t, true, ok)
ingester2Desc := desc.Ingesters["ing2"]
Expand All @@ -567,7 +572,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

// Simulate ingester2 crash on startup and left the ring with PENDING state
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
desc, ok := in.(*Desc)
require.Equal(t, true, ok)
ingester2Desc := desc.Ingesters["ing2"]
Expand All @@ -589,7 +594,7 @@ func TestTokensOnDisk(t *testing.T) {
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck
Expand All @@ -601,14 +606,14 @@ func TestTokensOnDisk(t *testing.T) {
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check this ingester joined, is active, and has 512 token.
var expTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
Expand All @@ -625,15 +630,15 @@ func TestTokensOnDisk(t *testing.T) {

// Start new ingester at same token directory.
lifecyclerConfig.ID = "ing2"
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))
defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck

// Check this ingester joined, is active, and has 512 token.
var actTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
Expand Down Expand Up @@ -662,7 +667,7 @@ func TestJoinInLeavingState(t *testing.T) {
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck
Expand All @@ -672,7 +677,7 @@ func TestJoinInLeavingState(t *testing.T) {
cfg.MinReadyDuration = 1 * time.Nanosecond

// Set state as LEAVING
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (interface{}, bool, error) {
err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
Expand All @@ -689,13 +694,13 @@ func TestJoinInLeavingState(t *testing.T) {
})
require.NoError(t, err)

l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check that the lifecycler was able to join after coming up in LEAVING
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
Expand All @@ -716,7 +721,7 @@ func TestJoinInJoiningState(t *testing.T) {
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck
Expand All @@ -728,7 +733,7 @@ func TestJoinInJoiningState(t *testing.T) {
instance2RegisteredAt := time.Now().Add(-2 * time.Hour)

// Set state as JOINING
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (interface{}, bool, error) {
err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
Expand All @@ -747,13 +752,13 @@ func TestJoinInJoiningState(t *testing.T) {
})
require.NoError(t, err)

l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check that the lifecycler was able to join after coming up in JOINING
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
Expand All @@ -780,15 +785,15 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) {
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

cfg := testLifecyclerConfig(ringConfig, "ing1")

// Set ing1 to not have a zone
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (interface{}, bool, error) {
err = r.KVClient.CAS(context.Background(), ringKey, func(in interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
Expand All @@ -806,13 +811,13 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) {
})
require.NoError(t, err)

l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check that the lifecycler was able to reset the zone value to the expected setting
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
return ok &&
Expand Down
12 changes: 0 additions & 12 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ import (
const (
unhealthy = "Unhealthy"

// IngesterRingKey is the key under which we store the ingesters ring in the KVStore.
IngesterRingKey = "ring"

// RulerRingKey is the key under which we store the rulers ring in the KVStore.
RulerRingKey = "ring"

// DistributorRingKey is the key under which we store the distributors ring in the KVStore.
DistributorRingKey = "distributor"

// CompactorRingKey is the key under which we store the compactors ring in the KVStore.
CompactorRingKey = "compactor"

// GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on
// a typical replication factor 3, plus extra room for a JOINING + LEAVING instance.
GetBufferSize = 5
Expand Down