Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions pkg/ring/kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"math/rand"
"net/http"
"time"

Expand Down Expand Up @@ -41,6 +42,10 @@ type Config struct {
ConsistentReads bool `yaml:"consistent_reads"`
WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit
WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1

// Used in tests only.
MaxCasRetries int `yaml:"-"`
CasRetryDelay time.Duration `yaml:"-"`
}

type kv interface {
Expand Down Expand Up @@ -117,11 +122,22 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
}

func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
var (
index = uint64(0)
retries := c.cfg.MaxCasRetries
if retries == 0 {
retries = 10
)
}

sleepBeforeRetry := time.Duration(0)
if c.cfg.CasRetryDelay > 0 {
sleepBeforeRetry = time.Duration(rand.Int63n(c.cfg.CasRetryDelay.Nanoseconds()))
}

index := uint64(0)
for i := 0; i < retries; i++ {
if i > 0 && sleepBeforeRetry > 0 {
time.Sleep(sleepBeforeRetry)
}

// Get with default options - don't want stale data to compare with
options := &consul.QueryOptions{}
kvp, _, err := c.kv.Get(key, options.WithContext(ctx))
Expand Down
9 changes: 6 additions & 3 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func TestRingUpdates(t *testing.T) {
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring))
t.Cleanup(func() {
_ = services.StartAndAwaitRunning(context.Background(), ring)
_ = services.StopAndAwaitTerminated(context.Background(), ring)
})

require.Equal(t, 0, ring.IngesterCount())
Expand Down Expand Up @@ -794,7 +794,7 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lc))

t.Cleanup(func() {
_ = services.StartAndAwaitRunning(context.Background(), lc)
_ = services.StopAndAwaitTerminated(context.Background(), lc)
})

return lc
Expand All @@ -803,7 +803,10 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl
// This test checks if shuffle-sharded ring can be reused, and whether it receives
// updates from "main" ring.
func TestShuffleShardWithCaching(t *testing.T) {
inmem := consul.NewInMemoryClient(GetCodec())
inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{
MaxCasRetries: 20,
CasRetryDelay: 500 * time.Millisecond,
})

cfg := Config{
KVStore: kv.Config{Mock: inmem},
Expand Down