Skip to content

Commit ab2d2cd

Browse files
authored
Automatically forget unhealthy Rulers (#2587)
* add ability to autoforget ruler instances Signed-off-by: Jacob Lisi <[email protected]> * update unit tests to handle autoforgetting ruler ring Signed-off-by: Jacob Lisi <[email protected]> * update changelog Signed-off-by: Jacob Lisi <[email protected]> * update per PR comments Signed-off-by: Jacob Lisi <[email protected]>
1 parent 43926c4 commit ab2d2cd

File tree

6 files changed

+162
-80
lines changed

6 files changed

+162
-80
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
3535
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.query-concurrency` flag. #2562
3636
* [FEATURE] TLS config options added for GRPC clients in Querier (Query-frontend client & Ingester client), Ruler, Store Gateway, as well as HTTP client in Config store client. #2502
37+
* [ENHANCEMENT] Ruler: Automatically remove unhealthy rulers from the ring. #2587
3738
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
3839
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
3940
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392

pkg/ruler/lifecycle.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
11
package ruler
22

33
import (
4-
"context"
4+
"github.com/cortexproject/cortex/pkg/ring"
55
)
66

7-
// TransferOut is a noop for the ruler
8-
func (r *Ruler) TransferOut(ctx context.Context) error {
9-
return nil
7+
func (r *Ruler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens) {
8+
// When we initialize the ruler instance in the ring we want to start from
9+
// a clean situation, so whatever is the state we set it ACTIVE, while we keep existing
10+
// tokens (if any).
11+
var tokens []uint32
12+
if instanceExists {
13+
tokens = instanceDesc.GetTokens()
14+
}
15+
16+
_, takenTokens := ringDesc.TokensFor(instanceID)
17+
newTokens := ring.GenerateTokens(r.cfg.Ring.NumTokens-len(tokens), takenTokens)
18+
19+
// Tokens sorting will be enforced by the parent caller.
20+
tokens = append(tokens, newTokens...)
21+
22+
return ring.ACTIVE, tokens
1023
}
1124

12-
// Flush triggers a flush of all the work items currently
13-
// scheduled by the ruler, currently every ruler will
14-
// query a backend rule store for it's rules so no
15-
// flush is required.
16-
func (r *Ruler) Flush() {}
25+
func (r *Ruler) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}
26+
func (r *Ruler) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}
27+
func (r *Ruler) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.IngesterDesc) {
28+
}

pkg/ruler/lifecycle_test.go

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,73 +2,104 @@ package ruler
22

33
import (
44
"context"
5+
"sort"
56
"testing"
67
"time"
78

8-
"github.com/stretchr/testify/assert"
99
"github.com/stretchr/testify/require"
1010

1111
"github.com/cortexproject/cortex/pkg/ring"
12+
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
1213
"github.com/cortexproject/cortex/pkg/ring/testutils"
1314
"github.com/cortexproject/cortex/pkg/util/services"
1415
"github.com/cortexproject/cortex/pkg/util/test"
1516
)
1617

1718
// TestRulerShutdown tests shutting down ruler unregisters correctly
1819
func TestRulerShutdown(t *testing.T) {
20+
ctx := context.Background()
21+
1922
config, cleanup := defaultRulerConfig(newMockRuleStore(mockRules))
20-
config.EnableSharding = true
21-
config.Ring.SkipUnregister = false
2223
defer cleanup()
2324

24-
r, rcleanup := newTestRuler(t, config)
25+
r, rcleanup := newRuler(t, config)
2526
defer rcleanup()
2627

28+
r.cfg.EnableSharding = true
29+
ringStore := consul.NewInMemoryClient(ring.GetCodec())
30+
31+
err := enableSharding(r, ringStore)
32+
require.NoError(t, err)
33+
34+
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
35+
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck
36+
2737
// Wait until the tokens are registered in the ring
2838
test.Poll(t, 100*time.Millisecond, config.Ring.NumTokens, func() interface{} {
29-
return testutils.NumTokens(config.Ring.KVStore.Mock, "localhost", ring.RulerRingKey)
39+
return testutils.NumTokens(ringStore, "localhost", ring.RulerRingKey)
3040
})
3141

42+
require.Equal(t, ring.ACTIVE, r.lifecycler.GetState())
43+
3244
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), r))
3345

3446
// Wait until the tokens are unregistered from the ring
3547
test.Poll(t, 100*time.Millisecond, 0, func() interface{} {
36-
return testutils.NumTokens(config.Ring.KVStore.Mock, "localhost", ring.RulerRingKey)
48+
return testutils.NumTokens(ringStore, "localhost", ring.RulerRingKey)
3749
})
3850
}
3951

40-
// TestRulerRestart tests a restarting ruler doesn't keep adding more tokens.
41-
func TestRulerRestart(t *testing.T) {
52+
func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
53+
const unhealthyInstanceID = "unhealthy-id"
54+
const heartbeatTimeout = time.Minute
55+
56+
ctx := context.Background()
4257
config, cleanup := defaultRulerConfig(newMockRuleStore(mockRules))
43-
config.Ring.SkipUnregister = true
44-
config.EnableSharding = true
4558
defer cleanup()
46-
47-
r, rcleanup := newTestRuler(t, config)
59+
r, rcleanup := newRuler(t, config)
4860
defer rcleanup()
61+
r.cfg.EnableSharding = true
62+
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond
63+
r.cfg.Ring.HeartbeatTimeout = heartbeatTimeout
4964

50-
// Wait until the tokens are registered in the ring
51-
test.Poll(t, 100*time.Millisecond, config.Ring.NumTokens, func() interface{} {
52-
return testutils.NumTokens(config.Ring.KVStore.Mock, "localhost", ring.RulerRingKey)
53-
})
65+
ringStore := consul.NewInMemoryClient(ring.GetCodec())
5466

55-
// Stop the ruler. Doesn't actually unregister due to skipUnregister: true
56-
r.StopAsync()
57-
require.NoError(t, r.AwaitTerminated(context.Background()))
67+
err := enableSharding(r, ringStore)
68+
require.NoError(t, err)
5869

59-
// We expect the tokens are preserved in the ring.
60-
assert.Equal(t, config.Ring.NumTokens, testutils.NumTokens(config.Ring.KVStore.Mock, "localhost", ring.RulerRingKey))
70+
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
71+
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck
6172

62-
// Create a new ruler which is expected to pick up tokens from the ring.
63-
r, rcleanup = newTestRuler(t, config)
64-
defer rcleanup()
65-
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck
73+
// Add an unhealthy instance to the ring.
74+
require.NoError(t, ringStore.CAS(ctx, ring.RulerRingKey, func(in interface{}) (interface{}, bool, error) {
75+
ringDesc := ring.GetOrCreateRingDesc(in)
76+
77+
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE)
78+
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
79+
ringDesc.Ingesters[unhealthyInstanceID] = instance
80+
81+
return ringDesc, true, nil
82+
}))
83+
84+
// Ensure the unhealthy instance is removed from the ring.
85+
test.Poll(t, time.Second*5, false, func() interface{} {
86+
d, err := ringStore.Get(ctx, ring.RulerRingKey)
87+
if err != nil {
88+
return err
89+
}
90+
91+
_, ok := ring.GetOrCreateRingDesc(d).Ingesters[unhealthyInstanceID]
92+
return ok
93+
})
94+
}
95+
96+
func generateSortedTokens(numTokens int) ring.Tokens {
97+
tokens := ring.GenerateTokens(numTokens, nil)
6698

67-
// Wait until the ruler is ACTIVE in the ring.
68-
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
69-
return r.lifecycler.GetState()
99+
// Ensure generated tokens are sorted.
100+
sort.Slice(tokens, func(i, j int) bool {
101+
return tokens[i] < tokens[j]
70102
})
71103

72-
// We expect no new tokens have been added to the ring.
73-
assert.Equal(t, config.Ring.NumTokens, testutils.NumTokens(config.Ring.KVStore.Mock, "localhost", ring.RulerRingKey))
104+
return ring.Tokens(tokens)
74105
}

pkg/ruler/ruler.go

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/cortexproject/cortex/pkg/ingester/client"
3232
"github.com/cortexproject/cortex/pkg/ring"
33+
"github.com/cortexproject/cortex/pkg/ring/kv"
3334
"github.com/cortexproject/cortex/pkg/ruler/rules"
3435
store "github.com/cortexproject/cortex/pkg/ruler/rules"
3536
"github.com/cortexproject/cortex/pkg/util"
@@ -147,7 +148,7 @@ type Ruler struct {
147148
alertURL *url.URL
148149
notifierCfg *config.Config
149150

150-
lifecycler *ring.Lifecycler
151+
lifecycler *ring.BasicLifecycler
151152
ring *ring.Ring
152153
subservices *services.Manager
153154

@@ -191,26 +192,50 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable
191192
logger: logger,
192193
}
193194

195+
if cfg.EnableSharding {
196+
ringStore, err := kv.NewClient(cfg.Ring.KVStore, ring.GetCodec())
197+
if err != nil {
198+
return nil, errors.Wrap(err, "create KV store client")
199+
}
200+
201+
if err = enableSharding(ruler, ringStore); err != nil {
202+
return nil, errors.Wrap(err, "setup ruler sharding ring")
203+
}
204+
}
205+
194206
ruler.Service = services.NewBasicService(ruler.starting, ruler.run, ruler.stopping)
195207
return ruler, nil
196208
}
197209

210+
func enableSharding(r *Ruler, ringStore kv.Client) error {
211+
lifecyclerCfg, err := r.cfg.Ring.ToLifecyclerConfig()
212+
if err != nil {
213+
return errors.Wrap(err, "failed to initialize ruler's lifecycler config")
214+
}
215+
216+
// Define lifecycler delegates in reverse order (last to be called defined first because they're
217+
// chained via "next delegate").
218+
delegate := ring.BasicLifecyclerDelegate(r)
219+
delegate = ring.NewLeaveOnStoppingDelegate(delegate, r.logger)
220+
delegate = ring.NewAutoForgetDelegate(r.cfg.Ring.HeartbeatTimeout*ringAutoForgetUnhealthyPeriods, delegate, r.logger)
221+
222+
r.lifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ring.RulerRingKey, ring.RulerRingKey, ringStore, delegate, r.logger, r.registry)
223+
if err != nil {
224+
return errors.Wrap(err, "failed to initialize ruler's lifecycler")
225+
}
226+
227+
r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), ring.RulerRingKey, ring.RulerRingKey, ringStore, &ring.DefaultReplicationStrategy{})
228+
if err != nil {
229+
return errors.Wrap(err, "failed to initialize ruler's ring")
230+
}
231+
232+
return nil
233+
}
234+
198235
func (r *Ruler) starting(ctx context.Context) error {
199-
// If sharding is enabled, create/join a ring to distribute tokens to
200-
// the ruler
236+
// If sharding is enabled, start the ruler ring subservices
201237
if r.cfg.EnableSharding {
202-
lifecyclerCfg := r.cfg.Ring.ToLifecyclerConfig()
203238
var err error
204-
r.lifecycler, err = ring.NewLifecycler(lifecyclerCfg, r, "ruler", ring.RulerRingKey, true)
205-
if err != nil {
206-
return errors.Wrap(err, "failed to initialize ruler's lifecycler")
207-
}
208-
209-
r.ring, err = ring.New(lifecyclerCfg.RingConfig, "ruler", ring.RulerRingKey)
210-
if err != nil {
211-
return errors.Wrap(err, "failed to initialize ruler's ring")
212-
}
213-
214239
r.subservices, err = services.NewManager(r.lifecycler, r.ring)
215240
if err == nil {
216241
err = services.StartManagerAndAwaitHealthy(ctx, r.subservices)
@@ -331,11 +356,14 @@ func (r *Ruler) ownsRule(hash uint32) (bool, error) {
331356
ringCheckErrors.Inc()
332357
return false, err
333358
}
334-
if rlrs.Ingesters[0].Addr == r.lifecycler.Addr {
335-
level.Debug(r.logger).Log("msg", "rule group owned", "owner_addr", rlrs.Ingesters[0].Addr, "addr", r.lifecycler.Addr)
359+
360+
localAddr := r.lifecycler.GetInstanceAddr()
361+
362+
if rlrs.Ingesters[0].Addr == localAddr {
363+
level.Debug(r.logger).Log("msg", "rule group owned", "owner_addr", rlrs.Ingesters[0].Addr, "addr", localAddr)
336364
return true, nil
337365
}
338-
level.Debug(r.logger).Log("msg", "rule group not owned, address does not match", "owner_addr", rlrs.Ingesters[0].Addr, "addr", r.lifecycler.Addr)
366+
level.Debug(r.logger).Log("msg", "rule group not owned, address does not match", "owner_addr", rlrs.Ingesters[0].Addr, "addr", localAddr)
339367
return false, nil
340368
}
341369

pkg/ruler/ruler_ring.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ruler
22

33
import (
44
"flag"
5+
"fmt"
56
"os"
67
"time"
78

@@ -13,6 +14,13 @@ import (
1314
"github.com/cortexproject/cortex/pkg/util/flagext"
1415
)
1516

17+
const (
18+
// If a ruler is unable to heartbeat the ring, its better to quickly remove it and resume
19+
// the evaluation of all rules since the worst case scenario is that some rulers will
20+
// receive duplicate/out-of-order sample errors.
21+
ringAutoForgetUnhealthyPeriods = 2
22+
)
23+
1624
// RingConfig masks the ring lifecycler config which contains
1725
// many options not really required by the rulers ring. This config
1826
// is used to strip down the config to the minimum, and avoid confusion
@@ -60,34 +68,30 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
6068

6169
// ToLifecyclerConfig returns a LifecyclerConfig based on the ruler
6270
// ring config.
63-
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
64-
// We have to make sure that the ring.LifecyclerConfig and ring.Config
65-
// defaults are preserved
66-
lc := ring.LifecyclerConfig{}
67-
rc := ring.Config{}
71+
func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) {
72+
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames)
73+
if err != nil {
74+
return ring.BasicLifecyclerConfig{}, err
75+
}
6876

69-
flagext.DefaultValues(&lc)
77+
instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)
78+
79+
return ring.BasicLifecyclerConfig{
80+
ID: cfg.InstanceID,
81+
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
82+
HeartbeatPeriod: cfg.HeartbeatPeriod,
83+
TokensObservePeriod: 0,
84+
NumTokens: cfg.NumTokens,
85+
}, nil
86+
}
87+
88+
func (cfg *RingConfig) ToRingConfig() ring.Config {
89+
rc := ring.Config{}
7090
flagext.DefaultValues(&rc)
7191

72-
// Configure ring
7392
rc.KVStore = cfg.KVStore
7493
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
7594
rc.ReplicationFactor = 1
7695

77-
// Configure lifecycler
78-
lc.RingConfig = rc
79-
lc.ListenPort = cfg.ListenPort
80-
lc.Addr = cfg.InstanceAddr
81-
lc.Port = cfg.InstancePort
82-
lc.ID = cfg.InstanceID
83-
lc.InfNames = cfg.InstanceInterfaceNames
84-
lc.SkipUnregister = cfg.SkipUnregister
85-
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
86-
lc.NumTokens = cfg.NumTokens
87-
lc.ObservePeriod = 0
88-
lc.JoinAfter = 0
89-
lc.MinReadyDuration = 0
90-
lc.FinalSleep = 0
91-
92-
return lc
96+
return rc
9397
}

pkg/ruler/ruler_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func defaultRulerConfig(store rules.RuleStore) (Config, func()) {
5555
return cfg, cleanup
5656
}
5757

58-
func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) {
58+
func newRuler(t *testing.T, cfg Config) (*Ruler, func()) {
5959
dir, err := ioutil.TempDir("", t.Name())
6060
testutil.Ok(t, err)
6161
cleanup := func() {
@@ -82,6 +82,12 @@ func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) {
8282
l = level.NewFilter(l, level.AllowInfo())
8383
ruler, err := NewRuler(cfg, engine, noopQueryable, pusher, prometheus.NewRegistry(), l)
8484
require.NoError(t, err)
85+
86+
return ruler, cleanup
87+
}
88+
89+
func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) {
90+
ruler, cleanup := newRuler(t, cfg)
8591
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))
8692

8793
// Ensure all rules are loaded before usage

0 commit comments

Comments
 (0)