From 5348ef82f9c88fb8fcc6a588e6e161a5c5578401 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 12:49:30 +0100 Subject: [PATCH 01/10] Use a real ring with mock KV when testing distributor. This is to teast out errors in the replication logic. Signed-off-by: Tom Wilkie --- pkg/distributor/distributor_test.go | 73 +++++++++-------------------- pkg/ring/kv/consul/client.go | 16 +++++++ pkg/ring/kv/consul/client_test.go | 8 ++-- 3 files changed, 43 insertions(+), 54 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 668e65a71b1..a8e086ec2e1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "math" "math/rand" "net/http" "sort" @@ -823,25 +824,36 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur }) } - // Mock the ingesters ring - ingesterDescs := []ring.IngesterDesc{} + // Use a real ring with a mock KV store to test ring RF logic. + ingesterDescs := map[string]ring.IngesterDesc{} ingestersByAddr := map[string]*mockIngester{} for i := range ingesters { addr := fmt.Sprintf("%d", i) - ingesterDescs = append(ingesterDescs, ring.IngesterDesc{ + ingesterDescs[addr] = ring.IngesterDesc{ Addr: addr, + Zone: addr, + State: ring.ACTIVE, Timestamp: time.Now().Unix(), - }) + Tokens: []uint32{uint32((math.MaxUint32 / numIngesters) * i)}, + } ingestersByAddr[addr] = &ingesters[i] } - ingestersRing := mockRing{ - Counter: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "foo", - }), - ingesters: ingesterDescs, - replicationFactor: 3, - } + store := consul.NewInMemoryClient(ring.GetCodec()) + err := store.Put(context.Background(), ring.IngesterRingKey, &ring.Desc{ + Ingesters: ingesterDescs, + }) + require.NoError(t, err) + + ingestersRing, err := ring.New(ring.Config{ + KVStore: kv.Config{ + Mock: store, + }, + HeartbeatTimeout: 60 * time.Minute, + ReplicationFactor: 3, + }, ring.IngesterRingKey, ring.IngesterRingKey) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) factory := func(addr string) (ring_client.PoolClient, error) { return ingestersByAddr[addr], nil @@ -959,45 +971,6 @@ func mustEqualMatcher(k, v string) *labels.Matcher { return m } -// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor -// ingesters. -type mockRing struct { - prometheus.Counter - ingesters []ring.IngesterDesc - replicationFactor uint32 -} - -func (r mockRing) Subring(key uint32, n int) (ring.ReadRing, error) { - return nil, fmt.Errorf("unimplemented") -} - -func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) { - result := ring.ReplicationSet{ - MaxErrors: 1, - Ingesters: buf[:0], - } - for i := uint32(0); i < r.replicationFactor; i++ { - n := (key + i) % uint32(len(r.ingesters)) - result.Ingesters = append(result.Ingesters, r.ingesters[n]) - } - return result, nil -} - -func (r mockRing) GetAll() (ring.ReplicationSet, error) { - return ring.ReplicationSet{ - Ingesters: r.ingesters, - MaxErrors: 1, - }, nil -} - -func (r mockRing) ReplicationFactor() int { - return int(r.replicationFactor) -} - -func (r mockRing) IngesterCount() int { - return len(r.ingesters) -} - type mockIngester struct { sync.Mutex client.IngesterClient diff --git a/pkg/ring/kv/consul/client.go b/pkg/ring/kv/consul/client.go index aa5cc7f260a..31bb4e58db3 100644 --- a/pkg/ring/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -92,6 +92,22 @@ func NewClient(cfg Config, codec codec.Codec) (*Client, error) { return c, nil } +// Put is mostly here for testing. +func (c *Client) Put(ctx context.Context, key string, value interface{}) error { + bytes, err := c.codec.Encode(value) + if err != nil { + return err + } + + return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _, err := c.kv.Put(&consul.KVPair{ + Key: key, + Value: bytes, + }, nil) + return err + }) +} + // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { diff --git a/pkg/ring/kv/consul/client_test.go b/pkg/ring/kv/consul/client_test.go index 23ff9e49e90..37e455bcf77 100644 --- a/pkg/ring/kv/consul/client_test.go +++ b/pkg/ring/kv/consul/client_test.go @@ -21,7 +21,7 @@ func writeValuesToKV(client *Client, key string, start, end int, sleep time.Dura defer close(ch) for i := start; i <= end; i++ { level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i) - _, _ = client.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil) + _, _ = client.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil) time.Sleep(sleep) } }() @@ -92,7 +92,7 @@ func TestReset(t *testing.T) { defer close(ch) for i := 0; i <= max; i++ { level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i) - _, _ = c.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil) + _, _ = c.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil) if i == 1 { c.kv.(*mockKV).ResetIndex() } @@ -142,11 +142,11 @@ func TestWatchKeyWithNoStartValue(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) - _, err := c.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil) + _, err := c.kv.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil) require.NoError(t, err) time.Sleep(100 * time.Millisecond) - _, err = c.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil) + _, err = c.kv.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil) require.NoError(t, err) }() From deeaec2561eb9f2c2f3a4091c0acd9c70e4826cb Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 12:52:21 +0100 Subject: [PATCH 02/10] Extend distributor test to cover the case RF=3 with 2 ingesters. Signed-off-by: Tom Wilkie --- pkg/distributor/distributor_test.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index a8e086ec2e1..15e029464b1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -408,7 +408,7 @@ func TestDistributor_PushQuery(t *testing.T) { for _, shardByAllLabels := range []bool{true, false} { // Test with between 3 and 10 ingesters. - for numIngesters := 3; numIngesters < 10; numIngesters++ { + for numIngesters := 2; numIngesters < 10; numIngesters++ { // Test with between 0 and numIngesters "happy" ingesters. for happyIngesters := 0; happyIngesters <= numIngesters; happyIngesters++ { @@ -427,6 +427,20 @@ func TestDistributor_PushQuery(t *testing.T) { continue } + // When we have less ingesters than replication factor, any failed ingester + // will cause a failure. + if shardByAllLabels && numIngesters < 3 && happyIngesters < 2 { + testcases = append(testcases, testcase{ + name: fmt.Sprintf("ExpectFail(shardByAllLabels=%v,numIngester=%d,happyIngester=%d)", shardByAllLabels, numIngesters, happyIngesters), + numIngesters: numIngesters, + happyIngesters: happyIngesters, + matchers: []*labels.Matcher{nameMatcher, barMatcher}, + expectedError: promql.ErrStorage{Err: errFail}, + shardByAllLabels: shardByAllLabels, + }) + continue + } + // If we're sharding by metric name and we have failed ingesters, we can't // tell ahead of time if the query will succeed, as we don't know which // ingesters will hold the results for the query. From c1395877d946f2c74e77103209a03ceb0b61e601 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 12:53:18 +0100 Subject: [PATCH 03/10] Ensure ring correctly calculates the number of allowed failures when RF=3 and #ingesters=2. Signed-off-by: Tom Wilkie --- pkg/ring/ring.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 33751d1b9b4..f6754d847fa 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -256,24 +256,28 @@ func (r *Ring) GetAll() (ReplicationSet, error) { return ReplicationSet{}, ErrEmptyRing } - ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) - maxErrors := r.cfg.ReplicationFactor / 2 + // Calculate the number of required ingesters; + // ensure we always require at least RF-1 when RF=3. + numRequired := len(r.ringDesc.Ingesters) + if numRequired < r.cfg.ReplicationFactor { + numRequired = r.cfg.ReplicationFactor + } + numRequired -= r.cfg.ReplicationFactor / 2 + ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(&ingester, Read) { - maxErrors-- - continue + if r.IsHealthy(&ingester, Read) { + ingesters = append(ingesters, ingester) } - ingesters = append(ingesters, ingester) } - if maxErrors < 0 { + if len(ingesters) < numRequired { return ReplicationSet{}, fmt.Errorf("too many failed ingesters") } return ReplicationSet{ Ingesters: ingesters, - MaxErrors: maxErrors, + MaxErrors: len(ingesters) - numRequired, }, nil } From e150c3d7b8b8c1d5e1fe6f3b9f2460cde3cd7602 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 13:34:30 +0100 Subject: [PATCH 04/10] Add changelog and review feedback. Signed-off-by: Tom Wilkie --- CHANGELOG.md | 1 + pkg/distributor/distributor_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a971aff8ff0..6f9c80793de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -144,6 +144,7 @@ This is the first major release of Cortex. We made a lot of **breaking changes** * [BUGFIX] Fixed etcd client keepalive settings. #2278 * [BUGFIX] Register the metrics of the WAL. #2295 * [BUXFIX] Experimental TSDB: fixed error handling when ingesting out of bound samples. #2342 +* [BUGFIX] Fix gaps when querying ingesters with replication factor = 3 and 2 ingesters in the cluster. #2503 ### Known issues diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 15e029464b1..0c762bc4c4e 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -407,7 +407,7 @@ func TestDistributor_PushQuery(t *testing.T) { // Run every test in both sharding modes. for _, shardByAllLabels := range []bool{true, false} { - // Test with between 3 and 10 ingesters. + // Test with between 2 and 10 ingesters. for numIngesters := 2; numIngesters < 10; numIngesters++ { // Test with between 0 and numIngesters "happy" ingesters. From 56419b8dc0aee6afff3875a377611fbf7d404fe6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 15:08:41 +0100 Subject: [PATCH 05/10] Refactor some distributor tests to try and get them to pass. Signed-off-by: Tom Wilkie --- pkg/distributor/distributor_test.go | 287 ++++++++++++++++++---------- 1 file changed, 185 insertions(+), 102 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 0c762bc4c4e..51877a3c453 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "math/rand" "net/http" "sort" "strconv" @@ -184,11 +183,17 @@ func TestDistributor_Push(t *testing.T) { limits.IngestionRate = 20 limits.IngestionBurstSize = 20 - d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, limits, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, _, r := prepare(t, prepConfig{ + numIngesters: tc.numIngesters, + happyIngesters: tc.happyIngesters, + numDistributors: 1, + shardByAllLabels: shardByAllLabels, + limits: limits, + }) + defer stopAll(ds, r) request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata) - response, err := d.Push(ctx, request) + response, err := ds[0].Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, tc.expectedError, err) @@ -270,23 +275,15 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { limits.IngestionRate = testData.ingestionRate limits.IngestionBurstSize = testData.ingestionBurstSize - // Init a shared KVStore - kvStore := consul.NewInMemoryClient(ring.GetCodec()) - // Start all expected distributors - distributors := make([]*Distributor, testData.distributors) - for i := 0; i < testData.distributors; i++ { - distributors[i], _ = prepare(t, 1, 1, 0, true, limits, kvStore) - defer services.StopAndAwaitTerminated(context.Background(), distributors[i]) //nolint:errcheck - } - - // If the distributors ring is setup, wait until the first distributor - // updates to the expected size - if distributors[0].distributorsRing != nil { - test.Poll(t, time.Second, testData.distributors, func() interface{} { - return distributors[0].distributorsRing.HealthyInstancesCount() - }) - } + distributors, _, r := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: testData.distributors, + shardByAllLabels: true, + limits: limits, + }) + defer stopAll(distributors, r) // Push samples in multiple requests to the first distributor for _, push := range testData.pushes { @@ -350,9 +347,17 @@ func TestDistributor_PushHAInstances(t *testing.T) { flagext.DefaultValues(&limits) limits.AcceptHASamples = true - d, _ := prepare(t, 1, 1, 0, shardByAllLabels, &limits, nil) + ds, _, r := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: shardByAllLabels, + limits: &limits, + }) + defer stopAll(ds, r) codec := GetReplicaDescCodec() mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix") + d := ds[0] if tc.enableTracker { r, err := newClusterTracker(HATrackerConfig{ @@ -488,20 +493,25 @@ func TestDistributor_PushQuery(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, _, r := prepare(t, prepConfig{ + numIngesters: tc.numIngesters, + happyIngesters: tc.happyIngesters, + numDistributors: 1, + shardByAllLabels: tc.shardByAllLabels, + }) + defer stopAll(ds, r) request := makeWriteRequest(0, tc.samples, tc.metadata) - writeResponse, err := d.Push(ctx, request) + writeResponse, err := ds[0].Push(ctx, request) assert.Equal(t, &client.WriteResponse{}, writeResponse) assert.Nil(t, err) - response, err := d.Query(ctx, 0, 10, tc.matchers...) + response, err := ds[0].Query(ctx, 0, 10, tc.matchers...) sort.Sort(response) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, tc.expectedError, err) - series, err := d.QueryStream(ctx, 0, 10, tc.matchers...) + series, err := ds[0].QueryStream(ctx, 0, 10, tc.matchers...) assert.Equal(t, tc.expectedError, err) if series == nil { @@ -526,7 +536,10 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { } cases := []testcase{ - { // Remove both cluster and replica label. + // Remove both cluster and replica label. + { + removeReplica: true, + removeLabels: []string{"cluster"}, inputSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, @@ -535,10 +548,11 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, }, - removeReplica: true, - removeLabels: []string{"cluster"}, }, - { // Remove multiple labels and replica. + // Remove multiple labels and replica. + { + removeReplica: true, + removeLabels: []string{"foo", "some"}, inputSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, @@ -550,10 +564,10 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, }, - removeReplica: true, - removeLabels: []string{"foo", "some"}, }, - { // Don't remove any labels. + // Don't remove any labels. + { + removeReplica: false, inputSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "__replica__", Value: "two"}, @@ -564,7 +578,6 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { {Name: "__replica__", Value: "two"}, {Name: "cluster", Value: "one"}, }, - removeReplica: false, }, } @@ -575,25 +588,29 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { limits.DropLabels = tc.removeLabels limits.AcceptHASamples = tc.removeReplica - d, ingesters := prepare(t, 1, 1, 0, true, &limits, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, ingesters, r := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + defer stopAll(ds, r) // Push the series to the distributor req := mockWriteRequest(tc.inputSeries, 1, 1) - _, err = d.Push(ctx, req) + _, err = ds[0].Push(ctx, req) require.NoError(t, err) // Since each test pushes only 1 series, we do expect the ingester // to have received exactly 1 series - assert.Equal(t, 1, len(ingesters)) - actualSeries := []*client.PreallocTimeseries{} - - for _, ts := range ingesters[0].timeseries { - actualSeries = append(actualSeries, ts) + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) + for _, v := range timeseries { + assert.Equal(t, tc.expectedSeries, client.FromLabelAdaptersToLabels(v.Labels)) + } } - - assert.Equal(t, 1, len(actualSeries)) - assert.Equal(t, tc.expectedSeries, client.FromLabelAdaptersToLabels(actualSeries[0].Labels)) } } @@ -677,30 +694,30 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - d, ingesters := prepare(t, 1, 1, 0, true, &limits, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, ingesters, r := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + defer stopAll(ds, r) // Push the series to the distributor req := mockWriteRequest(testData.inputSeries, 1, 1) - _, err := d.Push(ctx, req) + _, err := ds[0].Push(ctx, req) require.NoError(t, err) // Since each test pushes only 1 series, we do expect the ingester // to have received exactly 1 series - require.Equal(t, 1, len(ingesters)) - require.Equal(t, 1, len(ingesters[0].timeseries)) + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) - var actualSeries *client.PreallocTimeseries - var actualToken uint32 - - for token, ts := range ingesters[0].timeseries { - actualSeries = ts - actualToken = token + series, ok := timeseries[testData.expectedToken] + require.True(t, ok) + assert.Equal(t, testData.expectedSeries, client.FromLabelAdaptersToLabels(series.Labels)) } - - // Ensure the series and the sharding token is the expected one - assert.Equal(t, testData.expectedSeries, client.FromLabelAdaptersToLabels(actualSeries.Labels)) - assert.Equal(t, testData.expectedToken, actualToken) }) } } @@ -714,13 +731,19 @@ func TestSlowQueries(t *testing.T) { if nIngesters-happy > 1 { expectedErr = promql.ErrStorage{Err: errFail} } - d, _ := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels, nil, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, _, r := prepare(t, prepConfig{ + numIngesters: nIngesters, + happyIngesters: happy, + numDistributors: 1, + queryDelay: 100 * time.Millisecond, + shardByAllLabels: shardByAllLabels, + }) + defer stopAll(ds, r) - _, err := d.Query(ctx, 0, 10, nameMatcher) + _, err := ds[0].Query(ctx, 0, 10, nameMatcher) assert.Equal(t, expectedErr, err) - _, err = d.QueryStream(ctx, 0, 10, nameMatcher) + _, err = ds[0].QueryStream(ctx, 0, 10, nameMatcher) assert.Equal(t, expectedErr, err) } } @@ -780,15 +803,20 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { } // Create distributor - d, _ := prepare(t, 3, 3, time.Duration(0), true, nil, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, _, r := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + }) + defer stopAll(ds, r) // Push fixtures ctx := user.InjectOrgID(context.Background(), "test") for _, series := range fixtures { req := mockWriteRequest(series.lbls, series.value, series.timestamp) - _, err := d.Push(ctx, req) + _, err := ds[0].Push(ctx, req) require.NoError(t, err) } @@ -797,7 +825,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { t.Run(testName, func(t *testing.T) { now := model.Now() - metrics, err := d.MetricsForLabelMatchers(ctx, now, now, testData.matchers...) + metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) require.NoError(t, err) assert.ElementsMatch(t, testData.expected, metrics) }) @@ -824,17 +852,27 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cli return client.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API) } -func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits, kvStore kv.Client) (*Distributor, []mockIngester) { +type prepConfig struct { + numIngesters, happyIngesters int + queryDelay time.Duration + shardByAllLabels bool + limits *validation.Limits + numDistributors int +} + +func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring) { + // util.Logger = log.NewLogfmtLogger(os.Stderr) + ingesters := []mockIngester{} - for i := 0; i < happyIngesters; i++ { + for i := 0; i < cfg.happyIngesters; i++ { ingesters = append(ingesters, mockIngester{ happy: true, - queryDelay: queryDelay, + queryDelay: cfg.queryDelay, }) } - for i := happyIngesters; i < numIngesters; i++ { + for i := cfg.happyIngesters; i < cfg.numIngesters; i++ { ingesters = append(ingesters, mockIngester{ - queryDelay: queryDelay, + queryDelay: cfg.queryDelay, }) } @@ -848,20 +886,24 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur Zone: addr, State: ring.ACTIVE, Timestamp: time.Now().Unix(), - Tokens: []uint32{uint32((math.MaxUint32 / numIngesters) * i)}, + Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}, } ingestersByAddr[addr] = &ingesters[i] } - store := consul.NewInMemoryClient(ring.GetCodec()) - err := store.Put(context.Background(), ring.IngesterRingKey, &ring.Desc{ - Ingesters: ingesterDescs, - }) + kvStore := consul.NewInMemoryClient(ring.GetCodec()) + err := kvStore.CAS(context.Background(), ring.IngesterRingKey, + func(_ interface{}) (interface{}, bool, error) { + return &ring.Desc{ + Ingesters: ingesterDescs, + }, true, nil + }, + ) require.NoError(t, err) ingestersRing, err := ring.New(ring.Config{ KVStore: kv.Config{ - Mock: store, + Mock: kvStore, }, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: 3, @@ -869,34 +911,58 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) + test.Poll(t, time.Second, cfg.numIngesters, func() interface{} { + return ingestersRing.IngesterCount() + }) + factory := func(addr string) (ring_client.PoolClient, error) { return ingestersByAddr[addr], nil } - var cfg Config - var clientConfig client.Config - flagext.DefaultValues(&cfg, &clientConfig) + distributors := make([]*Distributor, 0, cfg.numDistributors) + for i := 0; i < cfg.numDistributors; i++ { + var distributorCfg Config + var clientConfig client.Config + flagext.DefaultValues(&distributorCfg, &clientConfig) + + distributorCfg.ingesterClientFactory = factory + distributorCfg.ShardByAllLabels = cfg.shardByAllLabels + distributorCfg.ExtraQueryDelay = 50 * time.Millisecond + distributorCfg.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond + distributorCfg.DistributorRing.InstanceID = strconv.Itoa(i) + distributorCfg.DistributorRing.KVStore.Mock = kvStore + distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1" + + if cfg.limits == nil { + cfg.limits = &validation.Limits{} + flagext.DefaultValues(cfg.limits) + } + overrides, err := validation.NewOverrides(*cfg.limits, nil) + require.NoError(t, err) + + d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) + + distributors = append(distributors, d) + } - if limits == nil { - limits = &validation.Limits{} - flagext.DefaultValues(limits) + // If the distributors ring is setup, wait until the first distributor + // updates to the expected size + if distributors[0].distributorsRing != nil { + test.Poll(t, time.Second, cfg.numDistributors, func() interface{} { + return distributors[0].distributorsRing.HealthyInstancesCount() + }) } - cfg.ingesterClientFactory = factory - cfg.ShardByAllLabels = shardByAllLabels - cfg.ExtraQueryDelay = 50 * time.Millisecond - cfg.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond - cfg.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) - cfg.DistributorRing.KVStore.Mock = kvStore - cfg.DistributorRing.InstanceAddr = "127.0.0.1" - - overrides, err := validation.NewOverrides(*limits, nil) - require.NoError(t, err) - d, err := New(cfg, clientConfig, overrides, ingestersRing, true) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) + return distributors, ingesters, ingestersRing +} - return d, ingesters +func stopAll(ds []*Distributor, r *ring.Ring) { + for _, d := range ds { + services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + } + //services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck } func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *client.WriteRequest { @@ -995,6 +1061,17 @@ type mockIngester struct { queryDelay time.Duration } +func (i *mockIngester) series() map[uint32]*client.PreallocTimeseries { + i.Lock() + defer i.Unlock() + + result := map[uint32]*client.PreallocTimeseries{} + for k, v := range i.timeseries { + result[k] = v + } + return result +} + func (i *mockIngester) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { return &grpc_health_v1.HealthCheckResponse{}, nil } @@ -1270,10 +1347,16 @@ func TestDistributorValidation(t *testing.T) { limits.RejectOldSamplesMaxAge = 24 * time.Hour limits.MaxLabelNamesPerSeries = 2 - d, _ := prepare(t, 3, 3, 0, true, &limits, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + ds, _, r := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + defer stopAll(ds, r) - _, err := d.Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, tc.metadata, client.API)) + _, err := ds[0].Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, tc.metadata, client.API)) require.Equal(t, tc.err, err) }) } From 41f8ca37b40f9d26ef55ed571e4b35023aa77893 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 19:48:26 +0100 Subject: [PATCH 06/10] Speed up tests but polling more frequently. Signed-off-by: Tom Wilkie --- pkg/util/test/poll.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/test/poll.go b/pkg/util/test/poll.go index 115b97f2636..168eb75a823 100644 --- a/pkg/util/test/poll.go +++ b/pkg/util/test/poll.go @@ -17,7 +17,7 @@ func Poll(t *testing.T, d time.Duration, want interface{}, have func() interface if reflect.DeepEqual(want, have()) { return } - time.Sleep(d / 10) + time.Sleep(d / 100) } h := have() if !reflect.DeepEqual(want, h) { From ee636654f6114c5ba90952f430dbb7a94cf81ecf Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 19:48:45 +0100 Subject: [PATCH 07/10] Fix same bug on the write path. Signed-off-by: Tom Wilkie --- pkg/ring/replication_strategy.go | 6 ++---- pkg/ring/replication_strategy_test.go | 7 +++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 9231b8d8fd3..c5da5c6fd86 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -34,7 +34,6 @@ func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati } minSuccess := (replicationFactor / 2) + 1 - maxFailure := replicationFactor - minSuccess // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters @@ -44,19 +43,18 @@ func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati i++ } else { ingesters = append(ingesters[:i], ingesters[i+1:]...) - maxFailure-- } } // This is just a shortcut - if there are not minSuccess available ingesters, // after filtering out dead ones, don't even bother trying. - if maxFailure < 0 || len(ingesters) < minSuccess { + if len(ingesters) < minSuccess { err := fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(ingesters)) return nil, 0, err } - return ingesters, maxFailure, nil + return ingesters, len(ingesters) - minSuccess, nil } func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool { diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index daf66c73793..10a299ec35e 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -28,6 +28,13 @@ func TestRingReplicationStrategy(t *testing.T) { ExpectedError: "at least 1 live replicas required, could only find 0", }, + // Ensure it works for RF=3 and 2 ingesters. + { + RF: 3, + LiveIngesters: 2, + ExpectedMaxFailure: 0, + }, + // Ensure it works for the default production config. { RF: 3, From 88b91e21a5342c27606c5f893d49f445816642dd Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 19:49:17 +0100 Subject: [PATCH 08/10] Tidy up the distributor tests. Signed-off-by: Tom Wilkie --- pkg/distributor/distributor_test.go | 41 ++++++++++++++++------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 51877a3c453..7b037dbb2dc 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -727,24 +727,27 @@ func TestSlowQueries(t *testing.T) { nIngesters := 3 for _, shardByAllLabels := range []bool{true, false} { for happy := 0; happy <= nIngesters; happy++ { - var expectedErr error - if nIngesters-happy > 1 { - expectedErr = promql.ErrStorage{Err: errFail} - } - ds, _, r := prepare(t, prepConfig{ - numIngesters: nIngesters, - happyIngesters: happy, - numDistributors: 1, - queryDelay: 100 * time.Millisecond, - shardByAllLabels: shardByAllLabels, - }) - defer stopAll(ds, r) + t.Run(fmt.Sprintf("%t/%d", shardByAllLabels, happy), func(t *testing.T) { + var expectedErr error + if nIngesters-happy > 1 { + expectedErr = promql.ErrStorage{Err: errFail} + } - _, err := ds[0].Query(ctx, 0, 10, nameMatcher) - assert.Equal(t, expectedErr, err) + ds, _, r := prepare(t, prepConfig{ + numIngesters: nIngesters, + happyIngesters: happy, + numDistributors: 1, + queryDelay: 100 * time.Millisecond, + shardByAllLabels: shardByAllLabels, + }) + defer stopAll(ds, r) + + _, err := ds[0].Query(ctx, 0, 10, nameMatcher) + assert.Equal(t, expectedErr, err) - _, err = ds[0].QueryStream(ctx, 0, 10, nameMatcher) - assert.Equal(t, expectedErr, err) + _, err = ds[0].QueryStream(ctx, 0, 10, nameMatcher) + assert.Equal(t, expectedErr, err) + }) } } } @@ -861,8 +864,6 @@ type prepConfig struct { } func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring) { - // util.Logger = log.NewLogfmtLogger(os.Stderr) - ingesters := []mockIngester{} for i := 0; i < cfg.happyIngesters; i++ { ingesters = append(ingesters, mockIngester{ @@ -962,7 +963,9 @@ func stopAll(ds []*Distributor, r *ring.Ring) { for _, d := range ds { services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck } - //services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + // Mock consul doesn't stop quickly, so don't wait. + r.StopAsync() } func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *client.WriteRequest { From cc699543f3cc943c73c3e6154e416817b792444a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 19:57:48 +0100 Subject: [PATCH 09/10] Make test correctly handle RF3 and 2 ingesters - previously was succeeding when it shouldn't Signed-off-by: Tom Wilkie --- pkg/distributor/distributor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7b037dbb2dc..b1e9a4a8e5d 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -434,7 +434,7 @@ func TestDistributor_PushQuery(t *testing.T) { // When we have less ingesters than replication factor, any failed ingester // will cause a failure. - if shardByAllLabels && numIngesters < 3 && happyIngesters < 2 { + if numIngesters < 3 && happyIngesters < 2 { testcases = append(testcases, testcase{ name: fmt.Sprintf("ExpectFail(shardByAllLabels=%v,numIngester=%d,happyIngester=%d)", shardByAllLabels, numIngesters, happyIngesters), numIngesters: numIngesters, From 2619f251363559835496e11b200aa6932fbd2582 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 22 Apr 2020 20:43:17 +0100 Subject: [PATCH 10/10] Update pkg/ring/ring.go Co-Authored-By: Jacob Lisi Signed-off-by: Tom Wilkie --- pkg/ring/ring.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index f6754d847fa..db873d5a064 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -262,7 +262,8 @@ func (r *Ring) GetAll() (ReplicationSet, error) { if numRequired < r.cfg.ReplicationFactor { numRequired = r.cfg.ReplicationFactor } - numRequired -= r.cfg.ReplicationFactor / 2 + maxUnavailable := r.cfg.ReplicationFactor / 2 + numRequired -= maxUnavailable ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters {