diff --git a/pkg/ring/kv/consul/client.go b/pkg/ring/kv/consul/client.go index 84e792651fa..9b5eeaeef1f 100644 --- a/pkg/ring/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "math/rand" "net/http" "time" @@ -41,6 +42,10 @@ type Config struct { ConsistentReads bool `yaml:"consistent_reads"` WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1 + + // Used in tests only. + MaxCasRetries int `yaml:"-"` + CasRetryDelay time.Duration `yaml:"-"` } type kv interface { @@ -117,11 +122,22 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou } func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - var ( - index = uint64(0) + retries := c.cfg.MaxCasRetries + if retries == 0 { retries = 10 - ) + } + + sleepBeforeRetry := time.Duration(0) + if c.cfg.CasRetryDelay > 0 { + sleepBeforeRetry = time.Duration(rand.Int63n(c.cfg.CasRetryDelay.Nanoseconds())) + } + + index := uint64(0) for i := 0; i < retries; i++ { + if i > 0 && sleepBeforeRetry > 0 { + time.Sleep(sleepBeforeRetry) + } + // Get with default options - don't want stale data to compare with options := &consul.QueryOptions{} kvp, _, err := c.kv.Get(key, options.WithContext(ctx)) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 8fc0e4923f8..e5eafc79af1 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -723,7 +723,7 @@ func TestRingUpdates(t *testing.T) { require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring)) t.Cleanup(func() { - _ = services.StartAndAwaitRunning(context.Background(), ring) + _ = services.StopAndAwaitTerminated(context.Background(), ring) }) require.Equal(t, 0, ring.IngesterCount()) @@ -794,7 +794,7 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl require.NoError(t, services.StartAndAwaitRunning(context.Background(), lc)) t.Cleanup(func() { - _ = services.StartAndAwaitRunning(context.Background(), lc) + _ = services.StopAndAwaitTerminated(context.Background(), lc) }) return lc @@ -803,7 +803,10 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl // This test checks if shuffle-sharded ring can be reused, and whether it receives // updates from "main" ring. func TestShuffleShardWithCaching(t *testing.T) { - inmem := consul.NewInMemoryClient(GetCodec()) + inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{ + MaxCasRetries: 20, + CasRetryDelay: 500 * time.Millisecond, + }) cfg := Config{ KVStore: kv.Config{Mock: inmem},