From e8ab296a671551c876b70ac3c17fd7dd5eea7751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 9 Oct 2020 16:00:01 +0200 Subject: [PATCH 1/6] Get log for failed test. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/ring_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 8fc0e4923f8..e6b7cb99988 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1,6 +1,7 @@ package ring import ( + "bytes" "context" "fmt" "math" @@ -9,6 +10,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -803,6 +805,15 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl // This test checks if shuffle-sharded ring can be reused, and whether it receives // updates from "main" ring. func TestShuffleShardWithCaching(t *testing.T) { + buf := bytes.Buffer{} + util.Logger = log.NewLogfmtLogger(&buf) + + defer func() { + if t.Failed() { + fmt.Println(buf.String()) + } + }() + inmem := consul.NewInMemoryClient(GetCodec()) cfg := Config{ From c636063bacf0154975d89683fe79e9249248a47e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 9 Oct 2020 16:51:12 +0200 Subject: [PATCH 2/6] Make more CAS attempts, and add a bit of random delay between them to reduce CAS collisions. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/consul/client.go | 22 +++++++++++++++++++--- pkg/ring/ring_test.go | 7 +++++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/ring/kv/consul/client.go b/pkg/ring/kv/consul/client.go index 84e792651fa..9b5eeaeef1f 100644 --- a/pkg/ring/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "math/rand" "net/http" "time" @@ -41,6 +42,10 @@ type Config struct { ConsistentReads bool `yaml:"consistent_reads"` WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1 + + // Used in tests only. + MaxCasRetries int `yaml:"-"` + CasRetryDelay time.Duration `yaml:"-"` } type kv interface { @@ -117,11 +122,22 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou } func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - var ( - index = uint64(0) + retries := c.cfg.MaxCasRetries + if retries == 0 { retries = 10 - ) + } + + sleepBeforeRetry := time.Duration(0) + if c.cfg.CasRetryDelay > 0 { + sleepBeforeRetry = time.Duration(rand.Int63n(c.cfg.CasRetryDelay.Nanoseconds())) + } + + index := uint64(0) for i := 0; i < retries; i++ { + if i > 0 && sleepBeforeRetry > 0 { + time.Sleep(sleepBeforeRetry) + } + // Get with default options - don't want stale data to compare with options := &consul.QueryOptions{} kvp, _, err := c.kv.Get(key, options.WithContext(ctx)) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index e6b7cb99988..4bfcf00883f 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -806,7 +806,7 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl // updates from "main" ring. func TestShuffleShardWithCaching(t *testing.T) { buf := bytes.Buffer{} - util.Logger = log.NewLogfmtLogger(&buf) + util.Logger = log.NewSyncLogger(log.NewLogfmtLogger(&buf)) defer func() { if t.Failed() { @@ -814,7 +814,10 @@ func TestShuffleShardWithCaching(t *testing.T) { } }() - inmem := consul.NewInMemoryClient(GetCodec()) + inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{ + MaxCasRetries: 20, + CasRetryDelay: 500 * time.Millisecond, + }) cfg := Config{ KVStore: kv.Config{Mock: inmem}, From ee4e667fb6a1805e2d3710c19c6933f1ef1af9d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 9 Oct 2020 17:09:05 +0200 Subject: [PATCH 3/6] Fix cleanup of lifecycler. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/ring_test.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 4bfcf00883f..d88e9a56aea 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1,7 +1,6 @@ package ring import ( - "bytes" "context" "fmt" "math" @@ -10,7 +9,6 @@ import ( "testing" "time" - "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -796,7 +794,7 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl require.NoError(t, services.StartAndAwaitRunning(context.Background(), lc)) t.Cleanup(func() { - _ = services.StartAndAwaitRunning(context.Background(), lc) + _ = services.StopAndAwaitTerminated(context.Background(), lc) }) return lc @@ -805,15 +803,6 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl // This test checks if shuffle-sharded ring can be reused, and whether it receives // updates from "main" ring. func TestShuffleShardWithCaching(t *testing.T) { - buf := bytes.Buffer{} - util.Logger = log.NewSyncLogger(log.NewLogfmtLogger(&buf)) - - defer func() { - if t.Failed() { - fmt.Println(buf.String()) - } - }() - inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{ MaxCasRetries: 20, CasRetryDelay: 500 * time.Millisecond, From 7fd4e6ab644dfc227929478c17b7957a910d0ed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 9 Oct 2020 17:09:33 +0200 Subject: [PATCH 4/6] Fix cleanup of lifecycler. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/ring_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index d88e9a56aea..6cf59c42f76 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1,6 +1,7 @@ package ring import ( + "bytes" "context" "fmt" "math" @@ -9,6 +10,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -803,6 +805,15 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl // This test checks if shuffle-sharded ring can be reused, and whether it receives // updates from "main" ring. func TestShuffleShardWithCaching(t *testing.T) { + buf := bytes.Buffer{} + util.Logger = log.NewSyncLogger(log.NewLogfmtLogger(&buf)) + + defer func() { + if t.Failed() { + fmt.Println(buf.String()) + } + }() + inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{ MaxCasRetries: 20, CasRetryDelay: 500 * time.Millisecond, From 2d458b80ada2d64b49e2e3a4bec2e2aa379e1a79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 9 Oct 2020 17:29:20 +0200 Subject: [PATCH 5/6] Fix another cleanup function. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/ring_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 6cf59c42f76..2820fcccf1e 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -725,7 +725,7 @@ func TestRingUpdates(t *testing.T) { require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring)) t.Cleanup(func() { - _ = services.StartAndAwaitRunning(context.Background(), ring) + _ = services.StopAndAwaitTerminated(context.Background(), ring) }) require.Equal(t, 0, ring.IngesterCount()) From 7451d219ab72e9f4df769d81b8ef22a80e84ff9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 9 Oct 2020 17:30:36 +0200 Subject: [PATCH 6/6] Remove racy buffering of log messages. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/ring_test.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 2820fcccf1e..e5eafc79af1 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1,7 +1,6 @@ package ring import ( - "bytes" "context" "fmt" "math" @@ -10,7 +9,6 @@ import ( "testing" "time" - "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -805,15 +803,6 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl // This test checks if shuffle-sharded ring can be reused, and whether it receives // updates from "main" ring. func TestShuffleShardWithCaching(t *testing.T) { - buf := bytes.Buffer{} - util.Logger = log.NewSyncLogger(log.NewLogfmtLogger(&buf)) - - defer func() { - if t.Failed() { - fmt.Println(buf.String()) - } - }() - inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{ MaxCasRetries: 20, CasRetryDelay: 500 * time.Millisecond,