Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Memberlist: add status page (/memberlist) with available details about memberlist-based KV store and memberlist cluster. It's also possible to view KV values in Go struct or JSON format, or download for inspection. #3575
* [ENHANCEMENT] Memberlist: client can now keep a size-bounded buffer with sent and received messages and display them in the admin UI (/memberlist) for troubleshooting. #3581
* [BUGFIX] Query-Frontend: `cortex_query_seconds_total` now return seconds not nanoseconds. #3589
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603

## 1.6.0-rc.0 in progress

Expand Down
18 changes: 9 additions & 9 deletions pkg/ring/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,30 +356,30 @@ func TestMergeRemoveMissing(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
"Ing 3": {Addr: "addr3", Timestamp: now + 3, State: LEFT}, // When deleting, time depends on value passed to merge function.
},
}
}

{
our, ch := mergeLocalCAS(firstRing(), secondRing())
our, ch := mergeLocalCAS(firstRing(), secondRing(), now+3)
assert.Equal(t, expectedFirstSecondMerge(), our)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
"Ing 3": {Addr: "addr3", Timestamp: now + 3, State: LEFT}, // When deleting, time depends on value passed to merge function.
},
}, ch) // entire second ring is new
}

{ // idempotency: (no change after applying same ring again)
our, ch := mergeLocalCAS(expectedFirstSecondMerge(), secondRing())
{ // idempotency: (no change after applying same ring again, even if time has advanced)
our, ch := mergeLocalCAS(expectedFirstSecondMerge(), secondRing(), now+10)
assert.Equal(t, expectedFirstSecondMerge(), our)
assert.Equal(t, (*Desc)(nil), ch)
}

{ // commutativity is broken when deleting missing entries. But let's make sure we get reasonable results at least.
our, ch := mergeLocalCAS(secondRing(), firstRing())
our, ch := mergeLocalCAS(secondRing(), firstRing(), now+3)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestMergeMissingIntoLeft(t *testing.T) {
}

{
our, ch := mergeLocalCAS(ring1(), ring2())
our, ch := mergeLocalCAS(ring1(), ring2(), now+10)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
Expand All @@ -438,8 +438,8 @@ func TestMergeMissingIntoLeft(t *testing.T) {
}
}

func mergeLocalCAS(ring1, ring2 *Desc) (*Desc, *Desc) {
change, err := ring1.Merge(ring2, true)
func mergeLocalCAS(ring1, ring2 *Desc, nowUnixTime int64) (*Desc, *Desc) {
change, err := ring1.mergeWithTime(ring2, true, time.Unix(nowUnixTime, 0))
if err != nil {
panic(err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) b
//
// This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.
func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) {
return d.mergeWithTime(mergeable, localCAS, time.Now())
}

func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now time.Time) (memberlist.Mergeable, error) {
if mergeable == nil {
return nil, nil
}
Expand Down Expand Up @@ -229,6 +233,10 @@ func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.
// missing, let's mark our ingester as LEFT
ting.State = LEFT
ting.Tokens = nil
// We are deleting entry "now", and should not keep old timestamp, because there may already be pending
// message in the gossip network with newer timestamp (but still older than "now").
// Such message would "resurrect" this deleted entry.
ting.Timestamp = now.Unix()
thisIngesterMap[name] = ting

updated = append(updated, name)
Expand Down