Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
// update timestamp to give gossiping client a chance register ring change.
ing := ringDesc.Ingesters[i.ID]
ing.Timestamp = time.Now().Unix()

// Tokens of the leaving ingester may have been generated by an older version of Cortex which
// doesn't guarantee sorted tokens, so we enforce sorting here.
sort.Sort(tokens)
ing.Tokens = tokens

ringDesc.Ingesters[i.ID] = ing
return ringDesc, true, nil
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,13 @@ func (d *Desc) getTokensInfo() map[uint32]instanceInfo {
func (d *Desc) GetTokens() []uint32 {
instances := make([][]uint32, 0, len(d.Ingesters))
for _, instance := range d.Ingesters {
instances = append(instances, instance.Tokens)
// Tokens may not be sorted for an older version of Cortex which, so we enforce sorting here.
tokens := instance.Tokens
if !sort.IsSorted(Tokens(tokens)) {
sort.Sort(Tokens(tokens))
}

instances = append(instances, tokens)
}

return MergeTokens(instances)
Expand All @@ -417,7 +423,13 @@ func (d *Desc) GetTokens() []uint32 {
func (d *Desc) getTokensByZone() map[string][]uint32 {
zones := map[string][][]uint32{}
for _, instance := range d.Ingesters {
zones[instance.Zone] = append(zones[instance.Zone], instance.Tokens)
// Tokens may not be sorted for an older version of Cortex which, so we enforce sorting here.
tokens := instance.Tokens
if !sort.IsSorted(Tokens(tokens)) {
sort.Sort(Tokens(tokens))
}

zones[instance.Zone] = append(zones[instance.Zone], tokens)
}

// Merge tokens per zone.
Expand Down
9 changes: 8 additions & 1 deletion pkg/ring/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func LoadTokensFromFile(tokenFilePath string) (Tokens, error) {
}
var t Tokens
err = t.Unmarshal(b)

// Tokens may have been written to file by an older version of Cortex which
// doesn't guarantee sorted tokens, so we enforce sorting here.
if !sort.IsSorted(t) {
sort.Sort(t)
}

return t, err
}

Expand All @@ -92,7 +99,7 @@ func (t *Tokens) Unmarshal(b []byte) error {
if err := json.Unmarshal(b, &tj); err != nil {
return err
}
*t = Tokens(tj.Tokens)
*t = tj.Tokens
return nil
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/ring/tokens_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package ring

import (
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -55,3 +58,20 @@ func TestTokens_Equals(t *testing.T) {
assert.Equal(t, c.expected, c.second.Equals(c.first))
}
}

func TestLoadTokensFromFile_ShouldGuaranteeSortedTokens(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-tokens")
require.NoError(t, err)
t.Cleanup(func() {
os.RemoveAll(tmpDir)
})

// Store tokens to file.
orig := Tokens{1, 5, 3}
require.NoError(t, orig.StoreToFile(filepath.Join(tmpDir, "tokens")))

// Read back and ensure they're sorted.
actual, err := LoadTokensFromFile(filepath.Join(tmpDir, "tokens"))
require.NoError(t, err)
assert.Equal(t, Tokens{1, 3, 5}, actual)
}