Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ring: add GetWithOptions method to adjust per call behavior #620

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,58 @@ const (
GetBufferSize = 5
)

type Options struct {
ReplicationFactor int
BufDescs []InstanceDesc
BufHosts []string
BufZones []string
}

type Option func(opts *Options)

func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option {
return func(opts *Options) {
opts.BufDescs = bufDescs
opts.BufHosts = bufHosts
opts.BufZones = bufZones
}
}

func WithReplicationFactor(replication int) Option {
return func(opts *Options) {
opts.ReplicationFactor = replication
}
}

func collectOptions(opts ...Option) *Options {
final := &Options{}
for _, opt := range opts {
opt(final)
}
return final
}
56quarters marked this conversation as resolved.
Show resolved Hide resolved

// ReadRing represents the read interface to the ring.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type ReadRing interface {
// Get returns n (or more) instances which form the replicas for the given key.
//
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call. This method is a superset
// of the functionality of the Get method.
GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// The resulting ReplicationSet doesn't necessarily contain all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
Expand Down Expand Up @@ -422,13 +459,21 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste

// Get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
return r.GetWithOptions(key, op, WithBuffers(bufDescs, bufHosts, bufZones))
}

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call.
func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
options := collectOptions(opts...)

r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil)
instances, err := r.findInstancesForKey(key, op, options.BufDescs, options.BufHosts, options.BufZones, options.ReplicationFactor, nil)
if err != nil {
return ReplicationSet{}, err
}
Expand All @@ -447,9 +492,13 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy.
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early.
// This function needs to be called with read lock on the ring.
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor {
replicationFactor = r.cfg.ReplicationFactor
}

var (
n = r.cfg.ReplicationFactor
n = replicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
Expand Down Expand Up @@ -1349,7 +1398,7 @@ func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instance

owned := 0
for _, tok := range keys {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, 0, func(foundInstanceID string) (include, keepGoing bool) {
if foundInstanceID == instanceID {
// If we've found our instance, we can stop.
return true, false
Expand Down
5 changes: 5 additions & 0 deletions ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHos
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
args := r.Called(key, op, opts)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
Expand Down
Loading