Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
func TestBasicLifecycler_GetTokenGenerator(t *testing.T) {
cfg := prepareBasicLifecyclerConfig()

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, true, log.NewNopLogger())
spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, true)
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}
Expand Down
10 changes: 5 additions & 5 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestLifecyclerConfig_Validate(t *testing.T) {
require.NoError(t, err)
require.Equal(t, pathToTokens, cfg.TokensFilePath)

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, true, log.NewNopLogger())
spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, true)
require.NoError(t, err)

cfg.RingTokenGenerator = spreadMinimizingTokenGenerator
Expand All @@ -88,7 +88,7 @@ func TestLifecycler_TokenGenerator(t *testing.T) {

cfg := testLifecyclerConfig(ringConfig, "instance-1")

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, true, log.NewNopLogger())
spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, true)
require.NoError(t, err)

tests := []TokenGenerator{nil, initTokenGenerator(t), spreadMinimizingTokenGenerator}
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func TestWaitBeforeJoining(t *testing.T) {
spreadMinimizingZones := []string{zone(1), zone(2), zone(3)}
canJoinTimeout := 5 * time.Second
spreadMinimizingTokenGenerator := func(targetInstanceID string, canJoinEnabled bool) TokenGenerator {
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(targetInstanceID, targetZone, spreadMinimizingZones, canJoinEnabled, log.NewNopLogger())
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(targetInstanceID, targetZone, spreadMinimizingZones, canJoinEnabled)
require.NoError(t, err)
return tokenGenerator
}
Expand Down Expand Up @@ -1476,7 +1476,7 @@ func TestAutoJoinWithSpreadMinimizingTokenGenerator(t *testing.T) {

// Token generator of the instance with id 0 will call CanJoin with a delay of canJoinDelay,
// in such a way that instance with id 1 waits for it.
tokenGeneratorWithDelay, err := newSpreadMinimizingTokenGeneratorWithDelay(firstInstanceID, zoneID, spreadMinimizingZones, true, canJoinDelay, log.NewNopLogger())
tokenGeneratorWithDelay, err := newSpreadMinimizingTokenGeneratorWithDelay(firstInstanceID, zoneID, spreadMinimizingZones, true, canJoinDelay)
require.NoError(t, err)
firstCfg := testLifecyclerConfig(ringConfig, firstInstanceID)
firstCfg.NumTokens = optimalTokensPerInstance
Expand All @@ -1486,7 +1486,7 @@ func TestAutoJoinWithSpreadMinimizingTokenGenerator(t *testing.T) {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), firstLifecycler))
defer services.StopAndAwaitTerminated(context.Background(), firstLifecycler) //nolint:errcheck

tokenGenerator, err := NewSpreadMinimizingTokenGenerator(secondInstanceID, zoneID, spreadMinimizingZones, true, log.NewNopLogger())
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(secondInstanceID, zoneID, spreadMinimizingZones, true)
require.NoError(t, err)
secondCfg := testLifecyclerConfig(ringConfig, secondInstanceID)
secondCfg.NumTokens = optimalTokensPerInstance
Expand Down
108 changes: 42 additions & 66 deletions ring/spread_minimizing_token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import (
"sort"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"golang.org/x/exp/slices"
)

Expand All @@ -22,11 +18,10 @@ const (
)

var (
instanceIDRegex = regexp.MustCompile(`^(.*)-(\d+)$`)
instanceIDRegex = regexp.MustCompile(`^(.*-)(\d+)$`)
errorBadInstanceIDFormat = func(instanceID string) error {
return fmt.Errorf("unable to extract instance id from %q", instanceID)
}
errorNoPreviousInstance = fmt.Errorf("impossible to find the instance preceding the target instance, because it is the first instance")

errorMissingPreviousInstance = func(requiredInstanceID string) error {
return fmt.Errorf("the instance %q has not been registered to the ring or has no tokens yet", requiredInstanceID)
Expand All @@ -49,15 +44,13 @@ var (
)

type SpreadMinimizingTokenGenerator struct {
instanceID int
instance string
zoneID int
spreadMinimizingZones []string
canJoinEnabled bool
logger log.Logger
instanceID int
instancePrefix string
zoneID int
canJoinEnabled bool
}

func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, canJoinEnabled bool, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, canJoinEnabled bool) (*SpreadMinimizingTokenGenerator, error) {
if len(spreadMinimizingZones) <= 0 || len(spreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(spreadMinimizingZones))
}
Expand All @@ -66,52 +59,35 @@ func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZo
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
instanceID, err := parseInstanceID(instance)
zoneID, err := findZoneID(zone, sortedZones)
if err != nil {
return nil, err
}
zoneID, err := findZoneID(zone, sortedZones)

prefix, instanceID, err := parseInstanceID(instance)
if err != nil {
return nil, err
}

tokenGenerator := &SpreadMinimizingTokenGenerator{
instanceID: instanceID,
instance: instance,
zoneID: zoneID,
spreadMinimizingZones: sortedZones,
canJoinEnabled: canJoinEnabled,
logger: logger,
}
return tokenGenerator, nil
return NewSpreadMinimizingTokenGeneratorForInstanceAndZoneID(prefix, instanceID, zoneID, canJoinEnabled), nil
}

func parseInstanceID(instanceID string) (int, error) {
parts := instanceIDRegex.FindStringSubmatch(instanceID)
if len(parts) != 3 {
return 0, errorBadInstanceIDFormat(instanceID)
func NewSpreadMinimizingTokenGeneratorForInstanceAndZoneID(instancePrefix string, instanceID, zoneID int, canJoinEnabled bool) *SpreadMinimizingTokenGenerator {
return &SpreadMinimizingTokenGenerator{
instanceID: instanceID,
instancePrefix: instancePrefix,
zoneID: zoneID,
canJoinEnabled: canJoinEnabled,
}
return strconv.Atoi(parts[2])
}

// previousInstance determines the string id of the instance preceding the given instance string id.
// If it is impossible to parse the given instanceID, or it is impossible to determine its predecessor
// because the passed instanceID has a bad format, or has no predecessor, an error is returned.
// For examples, my-instance-1 is preceded by instance my-instance-0, but my-instance-0 has no
// predecessor because its index is 0.
func previousInstance(instanceID string) (string, error) {
func parseInstanceID(instanceID string) (string, int, error) {
parts := instanceIDRegex.FindStringSubmatch(instanceID)
if len(parts) != 3 {
return "", errorBadInstanceIDFormat(instanceID)
}
id, err := strconv.Atoi(parts[2])
if err != nil {
return "", err
}
if id == 0 {
return "", errorNoPreviousInstance
return "", 0, errorBadInstanceIDFormat(instanceID)
}
return fmt.Sprintf("%s-%d", parts[1], id-1), nil
val, err := strconv.Atoi(parts[2])
return parts[1], val, err
}

// findZoneID gets a zone name and a slice of sorted zones,
Expand Down Expand Up @@ -193,7 +169,11 @@ func (t *SpreadMinimizingTokenGenerator) GenerateTokens(requestedTokensCount int
used[v] = true
}

allTokens := t.generateAllTokens()
allTokens, err := t.generateAllTokens()
if err != nil {
// we were unable to generate required tokens, so we panic.
panic(err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This panic is not new, it was just moved from generateTokensByInstanceID to this method.

}
uniqueTokens := make(Tokens, 0, requestedTokensCount)

// allTokens is a sorted slice of tokens for instance t.cfg.InstanceID in zone t.cfg.zone
Expand All @@ -214,25 +194,28 @@ func (t *SpreadMinimizingTokenGenerator) GenerateTokens(requestedTokensCount int
// placed in the ring that already contains instances with all the ids lower that t.instanceID
// is optimal.
// Calls to this method will always return the same set of tokens.
func (t *SpreadMinimizingTokenGenerator) generateAllTokens() Tokens {
tokensByInstanceID := t.generateTokensByInstanceID()
func (t *SpreadMinimizingTokenGenerator) generateAllTokens() (Tokens, error) {
tokensByInstanceID, err := t.generateTokensByInstanceID()
if err != nil {
return nil, err
}
allTokens := tokensByInstanceID[t.instanceID]
slices.Sort(allTokens)
return allTokens
return allTokens, nil
}

// generateTokensByInstanceID generates the optimal number of tokens (optimalTokenPerInstance),
// i.e., 512, for all instances whose id is less or equal to the id of the underlying instance
// (with id t.instanceID). Generated tokens are not sorted, but they are distributed in such a
// way that registered ownership of all the instances is optimal.
// Calls to this method will always return the same set of tokens.
func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]Tokens {
func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() (map[int]Tokens, error) {
firstInstanceTokens := t.generateFirstInstanceTokens()
tokensByInstanceID := make(map[int]Tokens, t.instanceID+1)
tokensByInstanceID[0] = firstInstanceTokens

if t.instanceID == 0 {
return tokensByInstanceID
return tokensByInstanceID, nil
}

// tokensQueues is a slice of priority queues. Slice indexes correspond
Expand Down Expand Up @@ -272,10 +255,8 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
optimalTokenOwnership := t.optimalTokenOwnership(optimalInstanceOwnership, currInstanceOwnership, uint32(optimalTokensPerInstance-addedTokens))
highestOwnershipInstance := instanceQueue.Peek()
if highestOwnershipInstance == nil || highestOwnershipInstance.ownership <= float64(optimalTokenOwnership) {
level.Warn(t.logger).Log("msg", "it was impossible to add a token because the instance with the highest ownership cannot satisfy the request", "added tokens", addedTokens+1, "highest ownership", highestOwnershipInstance.ownership, "requested ownership", optimalTokenOwnership)
// if this happens, it means that we cannot accommodate other tokens, so we panic
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID], optimalTokenOwnership)
panic(err)
// if this happens, it means that we cannot accommodate other tokens
return nil, fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone id %d because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.zoneID, optimalTokenOwnership)
}
tokensQueue := tokensQueues[highestOwnershipInstance.item.instanceID]
highestOwnershipToken := tokensQueue.Peek()
Expand All @@ -288,10 +269,8 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
token := highestOwnershipToken.item
newToken, err := t.calculateNewToken(token, optimalTokenOwnership)
if err != nil {
level.Error(t.logger).Log("msg", "it was impossible to calculate a new token because an error occurred", "err", err)
// if this happens, it means that we cannot accommodate additional tokens, so we panic
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID])
panic(err)
// if this happens, it means that we cannot accommodate additional tokens
return nil, fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone id %d", addedTokens+1, i, t.zoneID)
}
tokens = append(tokens, newToken)
// add the new token to currInstanceTokenQueue
Expand All @@ -317,7 +296,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
tokensByInstanceID[i] = tokens
// if this is the last iteration we return, so we avoid to call additional heap.Pushs
if i == t.instanceID {
return tokensByInstanceID
return tokensByInstanceID, nil
}

// If there were some ignored instances, we put them back on the queue.
Expand All @@ -331,21 +310,18 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
heap.Push(&instanceQueue, newRingInstanceOwnershipInfo(i, currInstanceOwnership))
}

return tokensByInstanceID
return tokensByInstanceID, nil
}

func (t *SpreadMinimizingTokenGenerator) CanJoin(instances map[string]InstanceDesc) error {
if !t.canJoinEnabled {
return nil
}

prevInstance, err := previousInstance(t.instance)
if err != nil {
if errors.Is(err, errorNoPreviousInstance) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please check whether we still need errorNoPreviousInstance? I think that at this point it should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removed errorNoPreviousInstance variable.

return nil
}
return err
if t.instanceID == 0 {
return nil
}
prevInstance := fmt.Sprintf("%s%d", t.instancePrefix, t.instanceID-1)
instanceDesc, ok := instances[prevInstance]
if ok && len(instanceDesc.Tokens) != 0 {
return nil
Expand Down
Loading