Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [BUGFIX] Query Frontend: Fix panic caused by nil pointer dereference. #6609
* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616
* [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601
* [BUGFIX] Ingester: Avoid resharding for qquery when restart readonly ingesters. #6642

## 1.19.0 2025-02-27

Expand Down
13 changes: 9 additions & 4 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,15 @@ func (i *Lifecycler) stopping(runningError error) error {
i.setPreviousState(currentState)
}

// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err)
// We dont need to mark us as leaving if READONLY. There is not request sent to us.
// Also important to avoid this change so we dont have resharding(for querier) happen when READONLY restart as we extended shard on READONLY but not on LEAVING
// Query also keeps calling pods on LEAVING or JOINING not causing any difference if left on READONLY
if i.GetState() != READONLY {
// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err)
}
}

// Do the transferring / flushing on a background goroutine so we can continue
Expand Down
86 changes: 86 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,92 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))
}

func TestRestartIngester_READONLY(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))

// poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
poll := func(condition func(*Desc) bool) map[string]InstanceDesc {
var ingesters map[string]InstanceDesc
test.Poll(t, 5*time.Second, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)

if ok {
ingesters = desc.Ingesters
}
return ok && condition(desc)
})

return ingesters
}

// Starts Ingester and wait it to became active
startIngesterAndWaitState := func(ingId string, addr string, expectedState InstanceState) *Lifecycler {
lifecyclerConfig := testLifecyclerConfigWithAddr(ringConfig, ingId, addr)
lifecyclerConfig.UnregisterOnShutdown = false
lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", ringKey, true, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler))
poll(func(desc *Desc) bool {
return desc.Ingesters[ingId].State == expectedState
})
return lifecycler
}

l1 := startIngesterAndWaitState("ing1", "0.0.0.0", ACTIVE)
l2 := startIngesterAndWaitState("ing2", "0.0.0.0", ACTIVE)

err = l2.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
poll(func(desc *Desc) bool {
return desc.Ingesters["ing2"].State == READONLY
})

ingesters := poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == READONLY
})

// Both Ingester should be active and running
assert.Equal(t, ACTIVE, ingesters["ing1"].State)
assert.Equal(t, READONLY, ingesters["ing2"].State)

// Stop ingester 1 gracefully should leave it on LEAVING STATE on the ring
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))
// Stop ingester 2 gracefully should keep on READONLY
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

ingesters = poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == LEAVING
})

assert.Equal(t, LEAVING, ingesters["ing1"].State)
assert.Equal(t, READONLY, ingesters["ing2"].State)

// Start Ingester1 again - Should flip back to ACTIVE in the ring
defer services.StopAndAwaitTerminated(context.Background(), startIngesterAndWaitState("ing1", "0.0.0.0", ACTIVE)) //nolint:errcheck

// Start Ingester2 again - Should keep on READONLY
defer services.StopAndAwaitTerminated(context.Background(), startIngesterAndWaitState("ing2", "0.0.0.0", READONLY)) //nolint:errcheck

ingesters = poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE
})

assert.Equal(t, ACTIVE, ingesters["ing1"].State)
assert.Equal(t, READONLY, ingesters["ing2"].State)
}

func TestTokenFileOnDisk(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
Loading