Skip to content
Merged
89 changes: 38 additions & 51 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"sort"
Expand Down Expand Up @@ -407,7 +408,7 @@ func TestDistributor_PushQuery(t *testing.T) {
for _, shardByAllLabels := range []bool{true, false} {

// Test with between 3 and 10 ingesters.
for numIngesters := 3; numIngesters < 10; numIngesters++ {
for numIngesters := 2; numIngesters < 10; numIngesters++ {

// Test with between 0 and numIngesters "happy" ingesters.
for happyIngesters := 0; happyIngesters <= numIngesters; happyIngesters++ {
Expand All @@ -426,6 +427,20 @@ func TestDistributor_PushQuery(t *testing.T) {
continue
}

// When we have less ingesters than replication factor, any failed ingester
// will cause a failure.
if shardByAllLabels && numIngesters < 3 && happyIngesters < 2 {
testcases = append(testcases, testcase{
name: fmt.Sprintf("ExpectFail(shardByAllLabels=%v,numIngester=%d,happyIngester=%d)", shardByAllLabels, numIngesters, happyIngesters),
numIngesters: numIngesters,
happyIngesters: happyIngesters,
matchers: []*labels.Matcher{nameMatcher, barMatcher},
expectedError: promql.ErrStorage{Err: errFail},
shardByAllLabels: shardByAllLabels,
})
continue
}

// If we're sharding by metric name and we have failed ingesters, we can't
// tell ahead of time if the query will succeed, as we don't know which
// ingesters will hold the results for the query.
Expand Down Expand Up @@ -823,25 +838,36 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
})
}

// Mock the ingesters ring
ingesterDescs := []ring.IngesterDesc{}
// Use a real ring with a mock KV store to test ring RF logic.
ingesterDescs := map[string]ring.IngesterDesc{}
ingestersByAddr := map[string]*mockIngester{}
for i := range ingesters {
addr := fmt.Sprintf("%d", i)
ingesterDescs = append(ingesterDescs, ring.IngesterDesc{
ingesterDescs[addr] = ring.IngesterDesc{
Addr: addr,
Zone: addr,
State: ring.ACTIVE,
Timestamp: time.Now().Unix(),
})
Tokens: []uint32{uint32((math.MaxUint32 / numIngesters) * i)},
}
ingestersByAddr[addr] = &ingesters[i]
}

ingestersRing := mockRing{
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
replicationFactor: 3,
}
store := consul.NewInMemoryClient(ring.GetCodec())
err := store.Put(context.Background(), ring.IngesterRingKey, &ring.Desc{
Ingesters: ingesterDescs,
})
require.NoError(t, err)

ingestersRing, err := ring.New(ring.Config{
KVStore: kv.Config{
Mock: store,
},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 3,
}, ring.IngesterRingKey, ring.IngesterRingKey)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))

factory := func(addr string) (ring_client.PoolClient, error) {
return ingestersByAddr[addr], nil
Expand Down Expand Up @@ -959,45 +985,6 @@ func mustEqualMatcher(k, v string) *labels.Matcher {
return m
}

// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor
// ingesters.
type mockRing struct {
prometheus.Counter
ingesters []ring.IngesterDesc
replicationFactor uint32
}

func (r mockRing) Subring(key uint32, n int) (ring.ReadRing, error) {
return nil, fmt.Errorf("unimplemented")
}

func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
Ingesters: buf[:0],
}
for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
result.Ingesters = append(result.Ingesters, r.ingesters[n])
}
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
}, nil
}

func (r mockRing) ReplicationFactor() int {
return int(r.replicationFactor)
}

func (r mockRing) IngesterCount() int {
return len(r.ingesters)
}

type mockIngester struct {
sync.Mutex
client.IngesterClient
Expand Down
16 changes: 16 additions & 0 deletions pkg/ring/kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ func NewClient(cfg Config, codec codec.Codec) (*Client, error) {
return c, nil
}

// Put is mostly here for testing.
func (c *Client) Put(ctx context.Context, key string, value interface{}) error {
bytes, err := c.codec.Encode(value)
if err != nil {
return err
}

return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := c.kv.Put(&consul.KVPair{
Key: key,
Value: bytes,
}, nil)
return err
})
}

// CAS atomically modifies a value in a callback.
// If value doesn't exist you'll get nil as an argument to your callback.
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ring/kv/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func writeValuesToKV(client *Client, key string, start, end int, sleep time.Dura
defer close(ch)
for i := start; i <= end; i++ {
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i)
_, _ = client.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
_, _ = client.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
time.Sleep(sleep)
}
}()
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestReset(t *testing.T) {
defer close(ch)
for i := 0; i <= max; i++ {
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i)
_, _ = c.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
_, _ = c.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
if i == 1 {
c.kv.(*mockKV).ResetIndex()
}
Expand Down Expand Up @@ -142,11 +142,11 @@ func TestWatchKeyWithNoStartValue(t *testing.T) {

go func() {
time.Sleep(100 * time.Millisecond)
_, err := c.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
_, err := c.kv.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
_, err = c.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil)
_, err = c.kv.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil)
require.NoError(t, err)
}()

Expand Down
20 changes: 12 additions & 8 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,24 +256,28 @@ func (r *Ring) GetAll() (ReplicationSet, error) {
return ReplicationSet{}, ErrEmptyRing
}

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
maxErrors := r.cfg.ReplicationFactor / 2
// Calculate the number of required ingesters;
// ensure we always require at least RF-1 when RF=3.
numRequired := len(r.ringDesc.Ingesters)
if numRequired < r.cfg.ReplicationFactor {
numRequired = r.cfg.ReplicationFactor
}
numRequired -= r.cfg.ReplicationFactor / 2

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range r.ringDesc.Ingesters {
if !r.IsHealthy(&ingester, Read) {
maxErrors--
continue
if r.IsHealthy(&ingester, Read) {
ingesters = append(ingesters, ingester)
}
ingesters = append(ingesters, ingester)
}

if maxErrors < 0 {
if len(ingesters) < numRequired {
return ReplicationSet{}, fmt.Errorf("too many failed ingesters")
}

return ReplicationSet{
Ingesters: ingesters,
MaxErrors: maxErrors,
MaxErrors: len(ingesters) - numRequired,
}, nil
}

Expand Down