Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
for _, c := range compactors {
cortex_testutil.Poll(t, 10*time.Second, len(compactors), func() interface{} {
// it is safe to access c.ring here, since we know that all compactors are Running now
rs, err := c.ring.GetAll(ring.Compactor)
rs, err := c.ring.GetAllHealthy(ring.Compactor)
if err != nil {
return 0
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
req := &client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
// Not using d.ForReplicationSet(), so we can fail after first error.
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
replicationSet, err := d.ingestersRing.GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetAll(ring.Read)
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
}
}

Expand All @@ -101,7 +101,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
}
}

return d.ingestersRing.GetAll(ring.Read)
return d.ingestersRing.GetReplicationSetForOperation(ring.Read)
}

// GetIngestersForMetadata returns a replication set including all ingesters that should be queried
Expand All @@ -119,11 +119,11 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetAll(ring.Read)
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
}
}

return d.ingestersRing.GetAll(ring.Read)
return d.ingestersRing.GetReplicationSetForOperation(ring.Read)
}

// queryIngesters queries the ingesters via the older, sample-based API.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {

// Wait until the ring client has initialised the state.
test.Poll(t, time.Second, true, func() interface{} {
all, err := r.GetAll(ring.Read)
all, err := r.GetAllHealthy(ring.Read)
return err == nil && len(all.Ingesters) > 0
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/client/ring_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func NewRingServiceDiscovery(r ring.ReadRing) PoolServiceDiscovery {
return func() ([]string, error) {
replicationSet, err := r.GetAll(ring.Read)
replicationSet, err := r.GetAllHealthy(ring.Reporting)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/client/ring_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ type mockReadRing struct {
mockedErr error
}

func (m *mockReadRing) GetAll(_ ring.Operation) (ring.ReplicationSet, error) {
func (m *mockReadRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) {
return m.mockedReplicationSet, m.mockedErr
}
45 changes: 41 additions & 4 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,18 @@ type ReadRing interface {
// buf is a slice to be overwritten for the return value
// to avoid memory allocation; can be nil.
Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
GetAll(op Operation) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy ingesters is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

ReplicationFactor() int
IngesterCount() int

Expand Down Expand Up @@ -89,6 +100,10 @@ var (
// ErrInstanceNotFound is the error returned when trying to get information for an instance
// not registered within the ring.
ErrInstanceNotFound = errors.New("instance not found in the ring")

// ErrTooManyFailedIngesters is the error returned when there are too many failed ingesters for a
// specific operation.
ErrTooManyFailedIngesters = errors.New("too many failed ingesters")
)

// Config for a Ring
Expand Down Expand Up @@ -317,8 +332,30 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
}, nil
}

// GetAll returns all available ingesters in the ring.
func (r *Ring) GetAll(op Operation) (ReplicationSet, error) {
// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range r.ringDesc.Ingesters {
if r.IsHealthy(&ingester, op) {
ingesters = append(ingesters, ingester)
}
}

return ReplicationSet{
Ingesters: ingesters,
MaxErrors: 0,
}, nil
}

// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

Expand All @@ -343,7 +380,7 @@ func (r *Ring) GetAll(op Operation) (ReplicationSet, error) {
}

if len(ingesters) < numRequired {
return ReplicationSet{}, fmt.Errorf("too many failed ingesters")
return ReplicationSet{}, ErrTooManyFailedIngesters
}

return ReplicationSet{
Expand Down
Loading