Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ring: add GetWithOptions method to adjust per call behavior #632

Merged
merged 6 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
* [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477
* [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486
* [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495
* [FEATURE] Add `ring.GetWithOptions()` method to support additional features at a per-call level. #632
* [ENHANCEMENT] Add option to hide token information in ring status page #633
* [ENHANCEMENT] Display token information in partition ring status page #631
* [ENHANCEMENT] Add ability to log all source hosts from http header instead of only the first one. #444
Expand Down
19 changes: 18 additions & 1 deletion ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
)

type ReplicationStrategy interface {
// Filter out unhealthy instances and checks if there're enough instances
// Filter out unhealthy instances and checks if there are enough instances
// for an operation to succeed. Returns an error if there are not enough
// instances.
Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error)

// SupportsExpandedReplication returns true for replication strategies that
// support increasing the replication factor beyond a single instance per zone,
// false otherwise.
SupportsExpandedReplication() bool
}

type defaultReplicationStrategy struct{}
Expand Down Expand Up @@ -70,6 +75,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
return instances, len(instances) - minSuccess, nil
}

func (s *defaultReplicationStrategy) SupportsExpandedReplication() bool {
// defaultReplicationStrategy assumes that a single instance per zone is returned and that
// it can treat replication factor as equivalent to the number of zones. This doesn't work
// when a per-call replication factor increases it beyond the configured replication factor
// and the number of zones.
return false
}

type ignoreUnhealthyInstancesReplicationStrategy struct{}

func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy {
Expand Down Expand Up @@ -101,6 +114,10 @@ func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []Instanc
return instances, len(instances) - 1, nil
}

func (r *ignoreUnhealthyInstancesReplicationStrategy) SupportsExpandedReplication() bool {
return true
}

func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool {
return instance.IsHealthy(op, r.cfg.HeartbeatTimeout, now)
}
Expand Down
163 changes: 112 additions & 51 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,44 @@ const (
GetBufferSize = 5
)

// Options are the result of Option instances that can be used to modify Ring.GetWithOptions behavior.
type Options struct {
ReplicationFactor int
BufDescs []InstanceDesc
BufHosts []string
BufZones []string
}

// Option can be used to modify Ring behavior when calling Ring.GetWithOptions
type Option func(opts *Options)

// WithBuffers creates an Option that will cause the given buffers to be used, avoiding allocations.
func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option {
return func(opts *Options) {
opts.BufDescs = bufDescs
opts.BufHosts = bufHosts
opts.BufZones = bufZones
}
}

// WithReplicationFactor creates an Option that overrides the default replication factor for a single call.
// Note that the overridden replication factor must be a multiple of the number of zones. That is, there
// should be an identical number of instances in each zone. E.g. if Zones = 3 and Default RF = 3, overridden
// replication factor must be 6, 9, etc.
func WithReplicationFactor(replication int) Option {
return func(opts *Options) {
opts.ReplicationFactor = replication
}
}

func collectOptions(opts ...Option) Options {
final := Options{}
for _, opt := range opts {
opt(&final)
}
return final
}

// ReadRing represents the read interface to the ring.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type ReadRing interface {
Expand All @@ -42,13 +80,17 @@ type ReadRing interface {
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more Option instances to change the behavior of the method call.
GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// The resulting ReplicationSet doesn't necessarily contain all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
Expand Down Expand Up @@ -424,19 +466,44 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste
}

// Get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, _ []string) (ReplicationSet, error) {
// Note that we purposefully aren't calling GetWithOptions here since the closures it
// uses result in heap allocations which we specifically avoid in this method since it's
// called in hot loops.
return r.getReplicationSetForKey(key, op, bufDescs, bufHosts, r.cfg.ReplicationFactor)
}

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call.
func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
options := collectOptions(opts...)
return r.getReplicationSetForKey(key, op, options.BufDescs, options.BufHosts, options.ReplicationFactor)
}

func (r *Ring) getReplicationSetForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil)
if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor {
replicationFactor = r.cfg.ReplicationFactor
}

// Not all replication strategies support increasing the replication factor beyond
// the number of zones available. Return an error unless a ReplicationStrategy has
// explicitly opted into supporting this.
if replicationFactor > r.cfg.ReplicationFactor && !r.strategy.SupportsExpandedReplication() {
return ReplicationSet{}, fmt.Errorf("per-call replication factor %d cannot exceed the configured replication factor %d with this replication strategy", replicationFactor, r.cfg.ReplicationFactor)
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, replicationFactor, nil)
if err != nil {
return ReplicationSet{}, err
}

healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, replicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
}
Expand All @@ -450,21 +517,34 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy.
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early.
// This function needs to be called with read lock on the ring.
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
var (
n = r.cfg.ReplicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
maxZones = len(r.ringTokensByZone)
n = replicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
// The configured replication factor is treated as the expected number of zones
// when zone-awareness is enabled. Per-call replication factor may increase the
// number of instances selected per zone, but the number of inferred zones does
// not change in this case.
maxZones = r.cfg.ReplicationFactor
maxInstances = len(r.ringDesc.Ingesters)

// We use a slice instead of a map because it's faster to search within a
// slice than lookup a map for a very low number of items.
// slice than lookup a map for a very low number of items, we only expect
// to have low single-digit number of hosts.
distinctHosts = bufHosts[:0]
distinctZones = bufZones[:0]

hostsPerZone = make(map[string]int)
targetHostsPerZone = max(1, replicationFactor/maxZones)
)
for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ {

for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ {
// If we have the target number of instances in all zones, stop looking.
if r.cfg.ZoneAwarenessEnabled && haveTargetHostsInAllZones(hostsPerZone, targetHostsPerZone, maxZones) {
break
}

iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
Expand All @@ -481,9 +561,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
continue
}

// Ignore if the instances don't have a zone set.
// If we already have the required number of instances for this zone, skip.
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
if slices.Contains(distinctZones, info.Zone) {
if hostsPerZone[info.Zone] >= targetHostsPerZone {
continue
}
}
Expand All @@ -496,9 +576,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
if op.ShouldExtendReplicaSetOnState(instance.State) {
n++
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
// We should only add the zone if we are not going to extend,
// as we want to extend the instance in the same AZ.
distinctZones = append(distinctZones, info.Zone)
// We should only increment the count for this zone if we are not going to
// extend, as we want to extend the instance in the same AZ.
hostsPerZone[info.Zone]++
}

include, keepGoing := true, true
Expand All @@ -515,6 +595,20 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
return instances, nil
}

func haveTargetHostsInAllZones(hostsByZone map[string]int, targetHostsPerZone int, maxZones int) bool {
if len(hostsByZone) != maxZones {
return false
}

for _, count := range hostsByZone {
if count < targetHostsPerZone {
return false
}
}

return true
}

// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
Expand Down Expand Up @@ -1335,36 +1429,3 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool {

// All states are healthy, no states extend replica set.
var allStatesRingOperation = Operation(0x0000ffff)

// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance.
func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringTokens) == 0 {
return 0, ErrEmptyRing
}

// Instance is not in this ring, it can't own any key.
if _, ok := r.ringDesc.Ingesters[instanceID]; !ok {
return 0, nil
}

owned := 0
for _, tok := range keys {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) {
if foundInstanceID == instanceID {
// If we've found our instance, we can stop.
return true, false
}
return false, true
})
if err != nil {
return 0, err
}
if len(i) > 0 {
owned++
}
}
return owned, nil
}
Loading
Loading